From a7ea340f225d78eb64105352f072560f1d83b03f Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 28 Jul 2020 02:33:27 -0700 Subject: [PATCH] Track votes from gossip for optimistic confirmation (#11209) * Add check in cluster_info_vote_listenere to see if optimstic conf was achieved Add OptimisticConfirmationVerifier * More fixes * Fix merge conflicts * Remove gossip notificatin * Add dashboards * Fix rebase * Count switch votes as well toward optimistic conf * rename Co-authored-by: Carl --- core/src/cluster_info_vote_listener.rs | 727 +++++++++++++----- core/src/consensus.rs | 1 + core/src/lib.rs | 2 + core/src/optimistic_confirmation_verifier.rs | 340 ++++++++ core/src/repair_service.rs | 1 + core/src/replay_stage.rs | 1 + core/src/rpc_pubsub.rs | 3 +- core/src/tpu.rs | 1 + core/src/vote_stake_tracker.rs | 68 ++ core/tests/ledger_cleanup.rs | 12 +- .../dashboards/cluster-monitor.json | 465 +++++++++-- programs/vote/src/vote_transaction.rs | 20 +- 12 files changed, 1368 insertions(+), 273 deletions(-) create mode 100644 core/src/optimistic_confirmation_verifier.rs create mode 100644 core/src/vote_stake_tracker.rs diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 0166fa1fce..96139fe77d 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -2,6 +2,7 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, consensus::PubkeyVotes, crds_value::CrdsValueLabel, + optimistic_confirmation_verifier::OptimisticConfirmationVerifier, poh_recorder::PohRecorder, pubkey_references::LockedPubkeyReferences, replay_stage::ReplayVotesReceiver, @@ -9,12 +10,14 @@ use crate::{ rpc_subscriptions::RpcSubscriptions, sigverify, verified_vote_packets::VerifiedVotePackets, + vote_stake_tracker::VoteStakeTracker, }; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select, Sender as CrossbeamSender, }; use itertools::izip; use log::*; +use solana_ledger::blockstore::Blockstore; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{self, Packets}; use solana_runtime::{ @@ -22,17 +25,19 @@ use solana_runtime::{ bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE, epoch_stakes::{EpochAuthorizedVoters, EpochStakes}, + stakes::Stakes, }; use solana_sdk::{ clock::{Epoch, Slot}, epoch_schedule::EpochSchedule, + hash::Hash, program_utils::limited_deserialize, pubkey::Pubkey, transaction::Transaction, }; -use solana_vote_program::vote_instruction::VoteInstruction; +use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::Vote}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, sync::{ atomic::{AtomicBool, Ordering}, {Arc, Mutex, RwLock}, @@ -55,8 +60,9 @@ pub struct SlotVoteTracker { // to whether or not we've seen the vote on gossip. // True if seen on gossip, false if only seen in replay. voted: HashMap, bool>, + optimistic_votes_tracker: HashMap, updates: Option>>, - total_stake: u64, + total_voted_stake: u64, gossip_only_stake: u64, } @@ -65,6 +71,13 @@ impl SlotVoteTracker { pub fn get_updates(&mut self) -> Option>> { self.updates.take() } + + pub fn get_or_insert_optimistic_votes_tracker(&mut self, hash: Hash) -> &mut VoteStakeTracker { + self.optimistic_votes_tracker.entry(hash).or_default() + } + pub fn optimistic_votes_tracker(&self, hash: &Hash) -> Option<&VoteStakeTracker> { + self.optimistic_votes_tracker.get(hash) + } } #[derive(Default)] @@ -97,6 +110,27 @@ impl VoteTracker { vote_tracker } + pub fn get_or_insert_slot_tracker(&self, slot: Slot) -> Arc> { + let mut slot_tracker = self.slot_vote_trackers.read().unwrap().get(&slot).cloned(); + + if slot_tracker.is_none() { + let new_slot_tracker = Arc::new(RwLock::new(SlotVoteTracker { + voted: HashMap::new(), + optimistic_votes_tracker: HashMap::default(), + updates: None, + total_voted_stake: 0, + gossip_only_stake: 0, + })); + self.slot_vote_trackers + .write() + .unwrap() + .insert(slot, new_slot_tracker.clone()); + slot_tracker = Some(new_slot_tracker); + } + + slot_tracker.unwrap() + } + pub fn get_slot_vote_tracker(&self, slot: Slot) -> Option>> { self.slot_vote_trackers.read().unwrap().get(&slot).cloned() } @@ -207,6 +241,7 @@ pub struct ClusterInfoVoteListener { } impl ClusterInfoVoteListener { + #[allow(clippy::too_many_arguments)] pub fn new( exit: &Arc, cluster_info: Arc, @@ -217,6 +252,7 @@ impl ClusterInfoVoteListener { subscriptions: Arc, verified_vote_sender: VerifiedVoteSender, replay_votes_receiver: ReplayVotesReceiver, + blockstore: Arc, ) -> Self { let exit_ = exit.clone(); @@ -257,10 +293,11 @@ impl ClusterInfoVoteListener { exit_, verified_vote_transactions_receiver, vote_tracker, - &bank_forks, + bank_forks, subscriptions, verified_vote_sender, replay_votes_receiver, + blockstore, ); }) .unwrap(); @@ -383,28 +420,45 @@ impl ClusterInfoVoteListener { exit: Arc, vote_txs_receiver: VerifiedVoteTransactionsReceiver, vote_tracker: Arc, - bank_forks: &RwLock, + bank_forks: Arc>, subscriptions: Arc, verified_vote_sender: VerifiedVoteSender, replay_votes_receiver: ReplayVotesReceiver, + blockstore: Arc, ) -> Result<()> { + let mut optimistic_confirmation_verifier = + OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root()); + let mut last_process_root = Instant::now(); loop { if exit.load(Ordering::Relaxed) { return Ok(()); } let root_bank = bank_forks.read().unwrap().root_bank().clone(); - vote_tracker.process_new_root_bank(&root_bank); - let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()); - if let Err(e) = Self::get_and_process_votes( + if last_process_root.elapsed().as_millis() > 400 { + let unrooted_optimistic_slots = optimistic_confirmation_verifier + .get_unrooted_optimistic_slots(&root_bank, &blockstore); + // SlotVoteTracker's for all `slots` in `unrooted_optimistic_slots` + // should still be available because we haven't purged in + // `process_new_root_bank()` yet, which is called below + OptimisticConfirmationVerifier::log_unrooted_optimistic_slots( + &root_bank, + &vote_tracker, + &unrooted_optimistic_slots, + ); + vote_tracker.process_new_root_bank(&root_bank); + last_process_root = Instant::now(); + } + let optimistic_confirmed_slots = Self::get_and_process_votes( &vote_txs_receiver, &vote_tracker, - root_bank.slot(), + &root_bank, &subscriptions, - epoch_stakes, &verified_vote_sender, &replay_votes_receiver, - ) { + ); + + if let Err(e) = optimistic_confirmed_slots { match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) | Error::ReadyTimeoutError => (), @@ -412,6 +466,10 @@ impl ClusterInfoVoteListener { error!("thread {:?} error {:?}", thread::current().name(), e); } } + } else { + let optimistic_confirmed_slots = optimistic_confirmed_slots.unwrap(); + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(optimistic_confirmed_slots); } } } @@ -420,17 +478,16 @@ impl ClusterInfoVoteListener { pub fn get_and_process_votes_for_tests( vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &VoteTracker, - last_root: Slot, + root_bank: &Bank, subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVotesReceiver, - ) -> Result<()> { + ) -> Result> { Self::get_and_process_votes( vote_txs_receiver, vote_tracker, - last_root, + root_bank, subscriptions, - None, verified_vote_sender, replay_votes_receiver, ) @@ -439,12 +496,11 @@ impl ClusterInfoVoteListener { fn get_and_process_votes( vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &VoteTracker, - last_root: Slot, + root_bank: &Bank, subscriptions: &RpcSubscriptions, - epoch_stakes: Option<&EpochStakes>, verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVotesReceiver, - ) -> Result<()> { + ) -> Result> { let mut sel = Select::new(); sel.recv(vote_txs_receiver); sel.recv(replay_votes_receiver); @@ -459,62 +515,72 @@ impl ClusterInfoVoteListener { // disconnected. `ready_timeout` can wake up spuriously, // hence the loop let _ = sel.ready_timeout(Duration::from_millis(remaining_wait_time))?; + + // Should not early return from this point onwards until `process_votes()` + // returns below to avoid missing any potential `optimistic_confirmed_slots` let vote_txs: Vec<_> = vote_txs_receiver.try_iter().flatten().collect(); let replay_votes: Vec<_> = replay_votes_receiver.try_iter().collect(); if !vote_txs.is_empty() || !replay_votes.is_empty() { - Self::process_votes( + return Ok(Self::process_votes( vote_tracker, vote_txs, - last_root, + root_bank, subscriptions, - epoch_stakes, verified_vote_sender, &replay_votes, - ); - break; + )); } else { remaining_wait_time = remaining_wait_time .saturating_sub(std::cmp::max(start.elapsed().as_millis() as u64, 1)); } } - Ok(()) + Ok(vec![]) + } + + fn parse_vote_transaction(tx: &Transaction) -> Option<(Pubkey, Vote, Option)> { + // Check first instruction for a vote + tx.message + .instructions + .first() + .and_then(|first_instruction| { + first_instruction + .accounts + .first() + .and_then(|first_account| { + tx.message + .account_keys + .get(*first_account as usize) + .and_then(|key| { + let vote_instruction = + limited_deserialize(&first_instruction.data).ok(); + vote_instruction.and_then(|vote_instruction| match vote_instruction + { + VoteInstruction::Vote(vote) => Some((*key, vote, None)), + VoteInstruction::VoteSwitch(vote, hash) => { + Some((*key, vote, Some(hash))) + } + _ => None, + }) + }) + }) + }) } fn process_votes( vote_tracker: &VoteTracker, vote_txs: Vec, - root: Slot, + root_bank: &Bank, subscriptions: &RpcSubscriptions, - epoch_stakes: Option<&EpochStakes>, verified_vote_sender: &VerifiedVoteSender, replay_votes: &[Arc], - ) { + ) -> Vec<(Slot, Hash)> { + let mut optimistic_confirmation_counted: HashSet<(Slot, Pubkey)> = HashSet::new(); let mut diff: HashMap, bool>> = HashMap::new(); + let mut new_optimistic_confirmed_slots = vec![]; + let root = root_bank.slot(); { for tx in vote_txs { - if let (Some(vote_pubkey), Some(vote_instruction)) = tx - .message - .instructions - .first() - .and_then(|first_instruction| { - first_instruction.accounts.first().map(|offset| { - ( - tx.message.account_keys.get(*offset as usize), - limited_deserialize(&first_instruction.data).ok(), - ) - }) - }) - .unwrap_or((None, None)) - { - let vote = { - match vote_instruction { - VoteInstruction::Vote(vote) => vote, - _ => { - continue; - } - } - }; - + if let Some((vote_pubkey, vote, _)) = Self::parse_vote_transaction(&tx) { if vote.slots.is_empty() { continue; } @@ -539,19 +605,64 @@ impl ClusterInfoVoteListener { continue; } - for &slot in vote.slots.iter() { - if slot <= root { + let root = root_bank.slot(); + let last_vote_hash = vote.hash; + for slot in &vote.slots { + // If slot is before the root, or so far ahead we don't have + // stake information, then ignore it + let epoch = root_bank.epoch_schedule().get_epoch(*slot); + let epoch_stakes = root_bank.epoch_stakes(epoch); + if *slot <= root || epoch_stakes.is_none() { continue; } + let epoch_stakes = epoch_stakes.unwrap(); + let epoch_vote_accounts = Stakes::vote_accounts(epoch_stakes.stakes()); + let total_epoch_stake = epoch_stakes.total_stake(); - let unduplicated_pubkey = vote_tracker.keys.get_or_insert(vote_pubkey); - diff.entry(slot) + let unduplicated_pubkey = vote_tracker.keys.get_or_insert(&vote_pubkey); + + // The last vote slot , which is the greatest slot in the stack + // of votes in a vote transaction, qualifies for optimistic confirmation. + let update_optimistic_confirmation_info = if slot == last_vote_slot { + let stake = epoch_vote_accounts + .get(&vote_pubkey) + .map(|(stake, _)| *stake) + .unwrap_or(0); + Some((stake, last_vote_hash)) + } else { + None + }; + + // If this vote for this slot qualifies for optimistic confirmation + if let Some((stake, hash)) = update_optimistic_confirmation_info { + // Fast track processing of the last slot in a vote transactions + // so that notifications for optimistic confirmation can be sent + // as soon as possible. + if !optimistic_confirmation_counted + .contains(&(*slot, *unduplicated_pubkey)) + && Self::add_optimistic_confirmation_vote( + vote_tracker, + *slot, + hash, + unduplicated_pubkey.clone(), + stake, + total_epoch_stake, + ) + { + optimistic_confirmation_counted + .insert((*slot, *unduplicated_pubkey)); + new_optimistic_confirmed_slots.push((*slot, last_vote_hash)); + // TODO: Notify subscribers about new optimistic confirmation + } + } + + diff.entry(*slot) .or_default() .insert(unduplicated_pubkey, true); } subscriptions.notify_vote(&vote); - let _ = verified_vote_sender.send((*vote_pubkey, vote.slots)); + let _ = verified_vote_sender.send((vote_pubkey, vote.slots)); } } } @@ -570,108 +681,91 @@ impl ClusterInfoVoteListener { } } + // Process all the slots accumulated from replay and gossip. for (slot, mut slot_diff) in diff { - let slot_tracker = vote_tracker - .slot_vote_trackers - .read() - .unwrap() - .get(&slot) - .cloned(); - if let Some(slot_tracker) = slot_tracker { - { - let r_slot_tracker = slot_tracker.read().unwrap(); - // Only keep the pubkeys we haven't seen voting for this slot - slot_diff.retain(|pubkey, seen_in_gossip_above| { - let seen_in_gossip_previously = r_slot_tracker.voted.get(pubkey); - let is_new = seen_in_gossip_previously.is_none(); - if is_new && !*seen_in_gossip_above { - // If this vote wasn't seen in gossip, then it must be a - // replay vote, and we haven't sent a notification for - // those yet - let _ = verified_vote_sender.send((**pubkey, vec![slot])); - } + let slot_tracker = vote_tracker.get_or_insert_slot_tracker(slot); + { + let r_slot_tracker = slot_tracker.read().unwrap(); + // Only keep the pubkeys we haven't seen voting for this slot + slot_diff.retain(|pubkey, seen_in_gossip_above| { + let seen_in_gossip_previously = r_slot_tracker.voted.get(pubkey); + let is_new = seen_in_gossip_previously.is_none(); + if is_new && !*seen_in_gossip_above { + // If this vote wasn't seen in gossip, then it must be a + // replay vote, and we haven't sent a notification for + // those yet + let _ = verified_vote_sender.send((**pubkey, vec![slot])); + } - // `is_new_from_gossip` means we observed a vote for this slot - // for the first time in gossip - let is_new_from_gossip = - !seen_in_gossip_previously.cloned().unwrap_or(false) - && *seen_in_gossip_above; - is_new || is_new_from_gossip - }); - } - let mut w_slot_tracker = slot_tracker.write().unwrap(); - if w_slot_tracker.updates.is_none() { - w_slot_tracker.updates = Some(vec![]); - } - let mut current_stake = 0; - let mut gossip_only_stake = 0; - for (pubkey, seen_in_gossip_above) in slot_diff { - let is_new = !w_slot_tracker.voted.contains_key(&pubkey); - Self::sum_stake( - &mut current_stake, - &mut gossip_only_stake, - epoch_stakes, - &pubkey, - // By this point we know if the vote was seen in gossip above, - // it was not seen in gossip at any point in the past, so it's - // safe to pass this in here as an overall indicator of whether - // this vote is new - seen_in_gossip_above, - is_new, - ); - - // From the `slot_diff.retain` earlier, we know because there are - // no other writers to `slot_vote_tracker` that - // `is_new || is_new_from_gossip`. In both cases we want to record - // `is_new_from_gossip` for the `pubkey` entry. - w_slot_tracker - .voted - .insert(pubkey.clone(), seen_in_gossip_above); - w_slot_tracker.updates.as_mut().unwrap().push(pubkey); - } - Self::notify_for_stake_change( - current_stake, - w_slot_tracker.total_stake, - &subscriptions, - epoch_stakes, - slot, - ); - w_slot_tracker.total_stake += current_stake; - w_slot_tracker.gossip_only_stake += gossip_only_stake - } else { - let mut total_stake = 0; - let mut gossip_only_stake = 0; - let voted: HashMap<_, _> = slot_diff - .into_iter() - .map(|(pubkey, seen_in_gossip_above)| { - if !seen_in_gossip_above { - let _ = verified_vote_sender.send((*pubkey, vec![slot])); - } - Self::sum_stake( - &mut total_stake, - &mut gossip_only_stake, - epoch_stakes, - &pubkey, - seen_in_gossip_above, - true, - ); - (pubkey, seen_in_gossip_above) - }) - .collect(); - Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot); - let new_slot_tracker = SlotVoteTracker { - updates: Some(voted.keys().cloned().collect()), - voted, - total_stake, - gossip_only_stake, - }; - vote_tracker - .slot_vote_trackers - .write() - .unwrap() - .insert(slot, Arc::new(RwLock::new(new_slot_tracker))); + // `is_new_from_gossip` means we observed a vote for this slot + // for the first time in gossip + let is_new_from_gossip = !seen_in_gossip_previously.cloned().unwrap_or(false) + && *seen_in_gossip_above; + is_new || is_new_from_gossip + }); } + let mut w_slot_tracker = slot_tracker.write().unwrap(); + if w_slot_tracker.updates.is_none() { + w_slot_tracker.updates = Some(vec![]); + } + let mut current_stake = 0; + let mut gossip_only_stake = 0; + let epoch = root_bank.epoch_schedule().get_epoch(slot); + let epoch_stakes = root_bank.epoch_stakes(epoch); + + for (pubkey, seen_in_gossip_above) in slot_diff { + let is_new = !w_slot_tracker.voted.contains_key(&pubkey); + Self::sum_stake( + &mut current_stake, + &mut gossip_only_stake, + epoch_stakes, + &pubkey, + // By this point we know if the vote was seen in gossip above, + // it was not seen in gossip at any point in the past, so it's + // safe to pass this in here as an overall indicator of whether + // this vote is new + seen_in_gossip_above, + is_new, + ); + + // From the `slot_diff.retain` earlier, we know because there are + // no other writers to `slot_vote_tracker` that + // `is_new || is_new_from_gossip`. In both cases we want to record + // `is_new_from_gossip` for the `pubkey` entry. + w_slot_tracker + .voted + .insert(pubkey.clone(), seen_in_gossip_above); + w_slot_tracker.updates.as_mut().unwrap().push(pubkey); + } + Self::notify_for_stake_change( + current_stake, + w_slot_tracker.total_voted_stake, + &subscriptions, + epoch_stakes, + slot, + ); + w_slot_tracker.total_voted_stake += current_stake; + w_slot_tracker.gossip_only_stake += gossip_only_stake } + new_optimistic_confirmed_slots + } + + // Returns if the slot was optimistically confirmed + fn add_optimistic_confirmation_vote( + vote_tracker: &VoteTracker, + slot: Slot, + hash: Hash, + pubkey: Arc, + stake: u64, + total_epoch_stake: u64, + ) -> bool { + let slot_tracker = vote_tracker.get_or_insert_slot_tracker(slot); + // Insert vote and check for optimistic confirmation + let mut w_slot_tracker = slot_tracker.write().unwrap(); + + w_slot_tracker + .get_or_insert_optimistic_votes_tracker(hash) + .add_vote_pubkey(pubkey, stake, total_epoch_stake) } fn notify_for_stake_change( @@ -719,13 +813,14 @@ impl ClusterInfoVoteListener { #[cfg(test)] mod tests { use super::*; + use crate::replay_stage::ReplayVotesSender; use solana_perf::packet; use solana_runtime::{ bank::Bank, commitment::BlockCommitmentCache, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, }; - use solana_sdk::hash::Hash; + use solana_sdk::hash::{self, Hash}; use solana_sdk::signature::Signature; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::vote_transaction; @@ -745,6 +840,7 @@ mod tests { &node_keypair, &vote_keypair, &vote_keypair, + Some(Hash::default()), ); use bincode::serialized_size; @@ -755,8 +851,7 @@ mod tests { assert_eq!(msgs.len(), 1); } - #[test] - fn vote_contains_authorized_voter() { + fn run_vote_contains_authorized_voter(hash: Option) { let node_keypair = Keypair::new(); let vote_keypair = Keypair::new(); let authorized_voter = Keypair::new(); @@ -768,6 +863,7 @@ mod tests { &node_keypair, &vote_keypair, &authorized_voter, + hash, ); // Check that the two signing keys pass the check @@ -795,6 +891,7 @@ mod tests { &node_keypair, &vote_keypair, &vote_keypair, + hash, ); // Check that the node_keypair and vote keypair pass the authorized voter check @@ -808,13 +905,19 @@ mod tests { &vote_keypair.pubkey() )); - // The other keypair should not pss the cchecck + // The other keypair should not pass the check assert!(!VoteTracker::vote_contains_authorized_voter( &vote_tx, &authorized_voter.pubkey() )); } + #[test] + fn test_vote_contains_authorized_voter() { + run_vote_contains_authorized_voter(None); + run_vote_contains_authorized_voter(Some(Hash::default())); + } + #[test] fn test_update_new_root() { let (vote_tracker, bank, _, _) = setup(); @@ -898,25 +1001,98 @@ mod tests { } #[test] - fn test_process_votes() { + fn test_votes_in_range() { // Create some voters at genesis + let stake_per_validator = 100; let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (votes_sender, votes_receiver) = unbounded(); - let (verified_vote_sender, verified_vote_receiver) = unbounded(); + let (verified_vote_sender, _verified_vote_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + vec![stake_per_validator; validator_voting_keypairs.len()], + ); + + let bank0 = Bank::new(&genesis_config); + // Votes for slots less than the provided root bank's slot should not be processed + let bank3 = Arc::new(Bank::new_from_parent( + &Arc::new(bank0), + &Pubkey::default(), + 3, + )); let vote_slots = vec![1, 2]; - let replay_vote_slots = vec![3, 4]; + send_vote_txs( + vote_slots, + vec![], + &validator_voting_keypairs, + None, + &votes_sender, + &replay_votes_sender, + ); + ClusterInfoVoteListener::get_and_process_votes( + &votes_receiver, + &vote_tracker, + &bank3, + &subscriptions, + &verified_vote_sender, + &replay_votes_receiver, + ) + .unwrap(); + + // Vote slots for slots greater than root bank's set of currently calculated epochs + // are ignored + let max_epoch = bank3.get_leader_schedule_epoch(bank3.slot()); + assert!(bank3.epoch_stakes(max_epoch).is_some()); + let unknown_epoch = max_epoch + 1; + assert!(bank3.epoch_stakes(unknown_epoch).is_none()); + let first_slot_in_unknown_epoch = bank3 + .epoch_schedule() + .get_first_slot_in_epoch(unknown_epoch); + let vote_slots = vec![first_slot_in_unknown_epoch, first_slot_in_unknown_epoch + 1]; + send_vote_txs( + vote_slots, + vec![], + &validator_voting_keypairs, + None, + &votes_sender, + &replay_votes_sender, + ); + ClusterInfoVoteListener::get_and_process_votes( + &votes_receiver, + &vote_tracker, + &bank3, + &subscriptions, + &verified_vote_sender, + &replay_votes_receiver, + ) + .unwrap(); + + // Should be no updates since everything was ignored + assert!(vote_tracker.slot_vote_trackers.read().unwrap().is_empty()); + } + + fn send_vote_txs( + gossip_vote_slots: Vec, + replay_vote_slots: Vec, + validator_voting_keypairs: &[ValidatorVoteKeypairs], + hash: Option, + votes_sender: &VerifiedVoteTransactionsSender, + replay_votes_sender: &ReplayVotesSender, + ) { validator_voting_keypairs.iter().for_each(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; let vote_tx = vote_transaction::new_vote_transaction( - vote_slots.clone(), + gossip_vote_slots.clone(), Hash::default(), Hash::default(), node_keypair, vote_keypair, vote_keypair, + hash, ); votes_sender.send(vec![vote_tx]).unwrap(); for vote_slot in &replay_vote_slots { @@ -929,14 +1105,41 @@ mod tests { .unwrap(); } }); + } + + fn run_test_process_votes(hash: Option) { + // Create some voters at genesis + let stake_per_validator = 100; + let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); + let (votes_txs_sender, votes_txs_receiver) = unbounded(); + let (replay_votes_sender, replay_votes_receiver) = unbounded(); + let (verified_vote_sender, verified_vote_receiver) = unbounded(); + + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + vec![stake_per_validator; validator_voting_keypairs.len()], + ); + let bank0 = Bank::new(&genesis_config); + + let gossip_vote_slots = vec![1, 2]; + let replay_vote_slots = vec![3, 4]; + send_vote_txs( + gossip_vote_slots.clone(), + replay_vote_slots.clone(), + &validator_voting_keypairs, + hash, + &votes_txs_sender, + &replay_votes_sender, + ); // Check that all the votes were registered for each validator correctly ClusterInfoVoteListener::get_and_process_votes( - &votes_receiver, + &votes_txs_receiver, &vote_tracker, - 0, + &bank0, &subscriptions, - None, &verified_vote_sender, &replay_votes_receiver, ) @@ -944,7 +1147,7 @@ mod tests { // Check that the received votes were pushed to other commponents // subscribing via `verified_vote_receiver` - let all_expected_slots: BTreeSet<_> = vote_slots + let all_expected_slots: BTreeSet<_> = gossip_vote_slots .into_iter() .chain(replay_vote_slots.into_iter()) .collect(); @@ -978,21 +1181,57 @@ mod tests { .as_ref() .unwrap() .contains(&Arc::new(pubkey))); + // Only the last vote in the stack of `gossip_votes` should count towards + // the `optimistic` vote set. + let optimistic_votes_tracker = + r_slot_vote_tracker.optimistic_votes_tracker(&Hash::default()); + if vote_slot == 2 { + let optimistic_votes_tracker = optimistic_votes_tracker.unwrap(); + assert!(optimistic_votes_tracker.voted().contains(&pubkey)); + assert_eq!( + optimistic_votes_tracker.stake(), + stake_per_validator * validator_voting_keypairs.len() as u64 + ); + } else { + assert!(optimistic_votes_tracker.is_none()) + } } } } + #[test] + fn test_process_votes1() { + run_test_process_votes(None); + run_test_process_votes(Some(Hash::default())); + } + #[test] fn test_process_votes2() { // Create some voters at genesis let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); + + // Create bank with the voters + let stake_per_validator = 100; + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + vec![stake_per_validator; validator_voting_keypairs.len()], + ); + let bank0 = Bank::new(&genesis_config); + // Send some votes to process let (votes_txs_sender, votes_txs_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (_replay_votes_sender, replay_votes_receiver) = unbounded(); let mut expected_votes = vec![]; - for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { + let num_voters_per_slot = 2; + let bank_hash = Hash::default(); + for (i, keyset) in validator_voting_keypairs + .chunks(num_voters_per_slot) + .enumerate() + { let validator_votes: Vec<_> = keyset .iter() .map(|keypairs| { @@ -1001,11 +1240,12 @@ mod tests { expected_votes.push((vote_keypair.pubkey(), vec![i as Slot + 1])); vote_transaction::new_vote_transaction( vec![i as u64 + 1], - Hash::default(), + bank_hash, Hash::default(), node_keypair, vote_keypair, vote_keypair, + None, ) }) .collect(); @@ -1016,9 +1256,8 @@ mod tests { ClusterInfoVoteListener::get_and_process_votes( &votes_txs_receiver, &vote_tracker, - 0, + &bank0, &subscriptions, - None, &verified_vote_sender, &replay_votes_receiver, ) @@ -1046,12 +1285,21 @@ mod tests { .as_ref() .unwrap() .contains(&Arc::new(pubkey))); + // All the votes were single votes, so they should all count towards + // the optimistic confirmation vote set + let optimistic_votes_tracker = r_slot_vote_tracker + .optimistic_votes_tracker(&bank_hash) + .unwrap(); + assert!(optimistic_votes_tracker.voted().contains(&pubkey)); + assert_eq!( + optimistic_votes_tracker.stake(), + num_voters_per_slot as u64 * stake_per_validator + ); } } } - #[test] - fn test_process_votes3() { + fn run_test_process_votes3(hash: Option) { let (votes_sender, votes_receiver) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded(); @@ -1085,6 +1333,7 @@ mod tests { node_keypair, vote_keypair, vote_keypair, + hash, ); votes_sender.send(vec![vote_tx.clone()]).unwrap(); } @@ -1096,13 +1345,8 @@ mod tests { let _ = ClusterInfoVoteListener::get_and_process_votes( &votes_receiver, &vote_tracker, - 0, + &bank, &subscriptions, - Some( - // Make sure `epoch_stakes` exists for this slot by unwrapping - bank.epoch_stakes(bank.epoch_schedule().get_epoch(vote_slot)) - .unwrap(), - ), &verified_vote_sender, &replay_votes_receiver, ); @@ -1112,17 +1356,23 @@ mod tests { if events == vec![1] { // Check `gossip_only_stake` is not incremented - assert_eq!(r_slot_vote_tracker.total_stake, 100); + assert_eq!(r_slot_vote_tracker.total_voted_stake, 100); assert_eq!(r_slot_vote_tracker.gossip_only_stake, 0); } else { - // Check that both the `gossip_only_stake` and `total_stake` both + // Check that both the `gossip_only_stake` and `total_voted_stake` both // increased - assert_eq!(r_slot_vote_tracker.total_stake, 100); + assert_eq!(r_slot_vote_tracker.total_voted_stake, 100); assert_eq!(r_slot_vote_tracker.gossip_only_stake, 100); } } } + #[test] + fn test_run_test_process_votes3() { + run_test_process_votes3(None); + run_test_process_votes3(Some(Hash::default())); + } + #[test] fn test_get_voters_by_epoch() { // Create some voters at genesis @@ -1179,9 +1429,14 @@ mod tests { #[test] fn test_vote_tracker_references() { // The number of references that get stored for a pubkey every time - // a vote is made. One stored in the SlotVoteTracker.voted, one in - // SlotVoteTracker.updates - let ref_count_per_vote = 2; + // a vote is added to the tracking set via a transaction. One stored in the + // SlotVoteTracker.voted, one in SlotVoteTracker.updates, one in + // SlotVoteTracker.optimistic_votes_tracker + let ref_count_per_vote = 3; + // Replay votes don't get added to `SlotVoteTracker.optimistic_votes_tracker`, + // so there's one less + let ref_count_per_replay_vote = ref_count_per_vote - 1; + let ref_count_per_new_key = 1; // Create some voters at genesis let validator_keypairs: Vec<_> = @@ -1216,15 +1471,15 @@ mod tests { &validator0_keypairs.node_keypair, &validator0_keypairs.vote_keypair, &validator0_keypairs.vote_keypair, + None, )]; let (verified_vote_sender, _verified_vote_receiver) = unbounded(); ClusterInfoVoteListener::process_votes( &vote_tracker, vote_tx, - 0, + &bank, &subscriptions, - None, &verified_vote_sender, // Add vote for same slot, should not affect outcome &[Arc::new(vec![( @@ -1242,9 +1497,11 @@ mod tests { .unwrap(), ); - // This pubkey voted for a slot, so ref count is `ref_count_per_vote + 1`, - // +1 in `vote_tracker.keys` and +ref_count_per_vote for the one vote - let mut current_ref_count = ref_count_per_vote + 1; + // This new pubkey submitted a vote for a slot, so ref count is + // `ref_count_per_vote + ref_count_per_new_key`. + // +ref_count_per_new_key for the new pubkey in `vote_tracker.keys` and + // +ref_count_per_vote for the one new vote + let mut current_ref_count = ref_count_per_vote + ref_count_per_new_key; assert_eq!(ref_count, current_ref_count); // Setup next epoch @@ -1267,8 +1524,9 @@ mod tests { // Make 2 new votes in two different epochs for the same pubkey, // the ref count should go up by 3 * ref_count_per_vote - // Add 1 vote through the replay channel, ref count should - let vote_txs: Vec<_> = [bank.slot() + 2, first_slot_in_new_epoch] + // Add 1 vote through the replay channel for a different pubkey, + // ref count should equal `current_ref_count` for that pubkey. + let vote_txs: Vec<_> = [first_slot_in_new_epoch - 1, first_slot_in_new_epoch] .iter() .map(|slot| { vote_transaction::new_vote_transaction( @@ -1279,16 +1537,18 @@ mod tests { &validator0_keypairs.node_keypair, &validator0_keypairs.vote_keypair, &validator0_keypairs.vote_keypair, + None, ) }) .collect(); + let new_root_bank = + Bank::new_from_parent(&bank, &Pubkey::default(), first_slot_in_new_epoch - 2); ClusterInfoVoteListener::process_votes( &vote_tracker, vote_txs, - 0, + &new_root_bank, &subscriptions, - None, &verified_vote_sender, &[Arc::new(vec![( validator_keypairs[1].vote_keypair.pubkey(), @@ -1306,7 +1566,11 @@ mod tests { .get(&validator_keypairs[1].vote_keypair.pubkey()) .unwrap(), ); - assert_eq!(ref_count, current_ref_count); + // This new pubkey submitted a replay vote for a slot, so ref count is + // `ref_count_per_optimistic_vote + ref_count_per_new_key`. + // +ref_count_per_new_key for the new pubkey in `vote_tracker.keys` and + // +ref_count_per_optimistic_vote for the one new vote + assert_eq!(ref_count, ref_count_per_replay_vote + ref_count_per_new_key); // Check the existing pubkey let ref_count = Arc::strong_count( @@ -1393,7 +1657,7 @@ mod tests { assert_eq!(num_packets, ref_value); } - fn test_vote_tx() -> Transaction { + fn test_vote_tx(hash: Option) -> Transaction { let node_keypair = Keypair::new(); let vote_keypair = Keypair::new(); let auth_voter_keypair = Keypair::new(); @@ -1404,12 +1668,12 @@ mod tests { &node_keypair, &vote_keypair, &auth_voter_keypair, + hash, ) } - #[test] - fn test_verify_votes_1_pass() { - let vote_tx = test_vote_tx(); + fn run_test_verify_votes_1_pass(hash: Option) { + let vote_tx = test_vote_tx(hash); let votes = vec![vote_tx]; let labels = vec![CrdsValueLabel::Vote(0, Pubkey::new_rand())]; let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, labels); @@ -1418,8 +1682,13 @@ mod tests { } #[test] - fn test_bad_vote() { - let vote_tx = test_vote_tx(); + fn test_verify_votes_1_pass() { + run_test_verify_votes_1_pass(None); + run_test_verify_votes_1_pass(Some(Hash::default())); + } + + fn run_test_bad_vote(hash: Option) { + let vote_tx = test_vote_tx(hash); let mut bad_vote = vote_tx.clone(); bad_vote.signatures[0] = Signature::default(); let votes = vec![vote_tx.clone(), bad_vote, vote_tx]; @@ -1438,69 +1707,101 @@ mod tests { // If `is_new_from_gossip` and `is_new` are both true, both fields // should increase - let mut total_stake = 0; + let mut total_voted_stake = 0; let mut gossip_only_stake = 0; let is_new_from_gossip = true; let is_new = true; ClusterInfoVoteListener::sum_stake( - &mut total_stake, + &mut total_voted_stake, &mut gossip_only_stake, Some(epoch_stakes), &vote_keypair.pubkey(), is_new_from_gossip, is_new, ); - assert_eq!(total_stake, 100); + assert_eq!(total_voted_stake, 100); assert_eq!(gossip_only_stake, 100); // If `is_new_from_gossip` and `is_new` are both false, none should increase - let mut total_stake = 0; + let mut total_voted_stake = 0; let mut gossip_only_stake = 0; let is_new_from_gossip = false; let is_new = false; ClusterInfoVoteListener::sum_stake( - &mut total_stake, + &mut total_voted_stake, &mut gossip_only_stake, Some(epoch_stakes), &vote_keypair.pubkey(), is_new_from_gossip, is_new, ); - assert_eq!(total_stake, 0); + assert_eq!(total_voted_stake, 0); assert_eq!(gossip_only_stake, 0); // If only `is_new`, but not `is_new_from_gossip` then - // `total_stake` will increase, but `gossip_only_stake` won't - let mut total_stake = 0; + // `total_voted_stake` will increase, but `gossip_only_stake` won't + let mut total_voted_stake = 0; let mut gossip_only_stake = 0; let is_new_from_gossip = false; let is_new = true; ClusterInfoVoteListener::sum_stake( - &mut total_stake, + &mut total_voted_stake, &mut gossip_only_stake, Some(epoch_stakes), &vote_keypair.pubkey(), is_new_from_gossip, is_new, ); - assert_eq!(total_stake, 100); + assert_eq!(total_voted_stake, 100); assert_eq!(gossip_only_stake, 0); // If only `is_new_from_gossip`, but not `is_new` then - // `gossip_only_stake` will increase, but `total_stake` won't - let mut total_stake = 0; + // `gossip_only_stake` will increase, but `total_voted_stake` won't + let mut total_voted_stake = 0; let mut gossip_only_stake = 0; let is_new_from_gossip = true; let is_new = false; ClusterInfoVoteListener::sum_stake( - &mut total_stake, + &mut total_voted_stake, &mut gossip_only_stake, Some(epoch_stakes), &vote_keypair.pubkey(), is_new_from_gossip, is_new, ); - assert_eq!(total_stake, 0); + assert_eq!(total_voted_stake, 0); assert_eq!(gossip_only_stake, 100); } + + #[test] + fn test_bad_vote() { + run_test_bad_vote(None); + run_test_bad_vote(Some(Hash::default())); + } + + fn run_test_parse_vote_transaction(input_hash: Option) { + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let auth_voter_keypair = Keypair::new(); + let bank_hash = Hash::default(); + let vote_tx = vote_transaction::new_vote_transaction( + vec![42], + bank_hash, + Hash::default(), + &node_keypair, + &vote_keypair, + &auth_voter_keypair, + input_hash, + ); + let (key, vote, hash) = ClusterInfoVoteListener::parse_vote_transaction(&vote_tx).unwrap(); + assert_eq!(hash, input_hash); + assert_eq!(vote, Vote::new(vec![42], bank_hash)); + assert_eq!(key, vote_keypair.pubkey()); + } + + #[test] + fn test_parse_vote_transaction() { + run_test_parse_vote_transaction(None); + run_test_parse_vote_transaction(Some(hash::hash(&[42u8]))); + } } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index bd05c1ad91..c0d9138bd5 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -762,6 +762,7 @@ pub mod test { &keypairs.node_keypair, &keypairs.vote_keypair, &keypairs.vote_keypair, + None, ); info!("voting {} {}", parent_bank.slot(), parent_bank.hash()); new_bank.process_transaction(&vote_tx).unwrap(); diff --git a/core/src/lib.rs b/core/src/lib.rs index bd3a21d8df..5b353ea247 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -35,6 +35,7 @@ pub mod heaviest_subtree_fork_choice; pub mod ledger_cleanup_service; pub mod local_vote_signer_service; pub mod non_circulating_supply; +pub mod optimistic_confirmation_verifier; pub mod poh_recorder; pub mod poh_service; pub mod progress_map; @@ -66,6 +67,7 @@ pub mod tree_diff; pub mod tvu; pub mod validator; pub mod verified_vote_packets; +pub mod vote_stake_tracker; pub mod weighted_shuffle; pub mod window_service; diff --git a/core/src/optimistic_confirmation_verifier.rs b/core/src/optimistic_confirmation_verifier.rs new file mode 100644 index 0000000000..b95917e7b5 --- /dev/null +++ b/core/src/optimistic_confirmation_verifier.rs @@ -0,0 +1,340 @@ +use crate::cluster_info_vote_listener::VoteTracker; +use solana_ledger::blockstore::Blockstore; +use solana_runtime::bank::Bank; +use solana_sdk::{clock::Slot, hash::Hash}; +use std::{collections::BTreeSet, time::Instant}; + +pub struct OptimisticConfirmationVerifier { + snapshot_start_slot: Slot, + unchecked_slots: BTreeSet<(Slot, Hash)>, + last_optimistic_slot_ts: Instant, +} + +impl OptimisticConfirmationVerifier { + pub fn new(snapshot_start_slot: Slot) -> Self { + Self { + snapshot_start_slot, + unchecked_slots: BTreeSet::default(), + last_optimistic_slot_ts: Instant::now(), + } + } + + // Returns any optimistic slots that were not rooted + pub fn get_unrooted_optimistic_slots( + &mut self, + root_bank: &Bank, + blockstore: &Blockstore, + ) -> Vec<(Slot, Hash)> { + let root = root_bank.slot(); + let root_ancestors = &root_bank.ancestors; + let mut slots_before_root = self + .unchecked_slots + .split_off(&((root + 1), Hash::default())); + // `slots_before_root` now contains all slots <= root + std::mem::swap(&mut slots_before_root, &mut self.unchecked_slots); + slots_before_root + .into_iter() + .filter(|(optimistic_slot, hash)| { + (*optimistic_slot == root && *hash != root_bank.hash()) + || (!root_ancestors.contains_key(&optimistic_slot) && + // In this second part of the `and`, we account for the possibility that + // there was some other root `rootX` set in BankForks where: + // + // `root` > `rootX` > `optimistic_slot` + // + // in which case `root` may not contain the ancestor information for + // slots < `rootX`, so we also have to check if `optimistic_slot` was rooted + // through blockstore. + !blockstore.is_root(*optimistic_slot)) + }) + .collect() + } + + pub fn add_new_optimistic_confirmed_slots(&mut self, new_optimistic_slots: Vec<(Slot, Hash)>) { + if new_optimistic_slots.is_empty() { + return; + } + + datapoint_info!( + "optimistic_slot_elapsed", + ( + "average_elapsed_ms", + self.last_optimistic_slot_ts.elapsed().as_millis() as i64, + i64 + ), + ); + + // We don't have any information about ancestors before the snapshot root, + // so ignore those slots + for (new_optimistic_slot, hash) in new_optimistic_slots { + if new_optimistic_slot > self.snapshot_start_slot { + datapoint_info!("optimistic_slot", ("slot", new_optimistic_slot, i64),); + self.unchecked_slots.insert((new_optimistic_slot, hash)); + } + } + + self.last_optimistic_slot_ts = Instant::now(); + } + + pub fn log_unrooted_optimistic_slots( + root_bank: &Bank, + vote_tracker: &VoteTracker, + unrooted_optimistic_slots: &[(Slot, Hash)], + ) { + let root = root_bank.slot(); + for (optimistic_slot, hash) in unrooted_optimistic_slots.iter() { + let epoch = root_bank.epoch_schedule().get_epoch(*optimistic_slot); + let epoch_stakes = root_bank.epoch_stakes(epoch); + let total_epoch_stake = epoch_stakes.map(|e| e.total_stake()).unwrap_or(0); + let voted_stake = { + let slot_tracker = vote_tracker.get_slot_vote_tracker(*optimistic_slot); + let r_slot_tracker = slot_tracker.as_ref().map(|s| s.read().unwrap()); + let voted_stake = r_slot_tracker + .as_ref() + .and_then(|s| s.optimistic_votes_tracker(hash)) + .map(|s| s.stake()) + .unwrap_or(0); + + warn!( + "Optimistic slot {}, hash: {}, epoch: {} was not rooted, + voted keys: {:?}, + root: {}, + root bank hash: {}, + voted stake: {}, + total epoch stake: {}, + pct: {}", + optimistic_slot, + hash, + epoch, + r_slot_tracker + .as_ref() + .and_then(|s| s.optimistic_votes_tracker(hash)) + .map(|s| s.voted()), + root, + root_bank.hash(), + voted_stake, + total_epoch_stake, + voted_stake as f64 / total_epoch_stake as f64, + ); + voted_stake + }; + + datapoint_warn!( + "optimistic_slot_not_rooted", + ("slot", *optimistic_slot, i64), + ("epoch", epoch, i64), + ("root", root, i64), + ("voted_stake", voted_stake, i64), + ("total_epoch_stake", total_epoch_stake, i64), + ); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::consensus::test::VoteSimulator; + use solana_ledger::get_tmp_ledger_path; + use solana_runtime::bank::Bank; + use solana_sdk::pubkey::Pubkey; + use std::collections::HashMap; + use trees::tr; + + #[test] + fn test_add_new_optimistic_confirmed_slots() { + let snapshot_start_slot = 10; + let bank_hash = Hash::default(); + let mut optimistic_confirmation_verifier = + OptimisticConfirmationVerifier::new(snapshot_start_slot); + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(vec![(snapshot_start_slot - 1, bank_hash)]); + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(vec![(snapshot_start_slot, bank_hash)]); + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(vec![(snapshot_start_slot + 1, bank_hash)]); + assert_eq!(optimistic_confirmation_verifier.unchecked_slots.len(), 1); + assert!(optimistic_confirmation_verifier + .unchecked_slots + .contains(&(snapshot_start_slot + 1, bank_hash))); + } + + #[test] + fn test_get_unrooted_optimistic_slots_same_slot_different_hash() { + let snapshot_start_slot = 0; + let mut optimistic_confirmation_verifier = + OptimisticConfirmationVerifier::new(snapshot_start_slot); + let bad_bank_hash = Hash::new(&[42u8; 32]); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let optimistic_slots = vec![(1, bad_bank_hash), (3, Hash::default())]; + optimistic_confirmation_verifier.add_new_optimistic_confirmed_slots(optimistic_slots); + let vote_simulator = setup_forks(); + let bank1 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(1) + .cloned() + .unwrap(); + assert_eq!( + optimistic_confirmation_verifier.get_unrooted_optimistic_slots(&bank1, &blockstore), + vec![(1, bad_bank_hash)] + ); + assert_eq!(optimistic_confirmation_verifier.unchecked_slots.len(), 1); + assert!(optimistic_confirmation_verifier + .unchecked_slots + .contains(&(3, Hash::default()))); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_get_unrooted_optimistic_slots() { + let snapshot_start_slot = 0; + let mut optimistic_confirmation_verifier = + OptimisticConfirmationVerifier::new(snapshot_start_slot); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let mut vote_simulator = setup_forks(); + let optimistic_slots: Vec<_> = vec![1, 3, 5] + .into_iter() + .map(|s| { + ( + s, + vote_simulator + .bank_forks + .read() + .unwrap() + .get(s) + .unwrap() + .hash(), + ) + }) + .collect(); + + // If root is on same fork, nothing should be returned + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(optimistic_slots.clone()); + let bank5 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(5) + .cloned() + .unwrap(); + assert!(optimistic_confirmation_verifier + .get_unrooted_optimistic_slots(&bank5, &blockstore) + .is_empty()); + // 5 is >= than all the unchecked slots, so should clear everything + assert!(optimistic_confirmation_verifier.unchecked_slots.is_empty()); + + // If root is on same fork, nothing should be returned + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(optimistic_slots.clone()); + let bank3 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(3) + .cloned() + .unwrap(); + assert!(optimistic_confirmation_verifier + .get_unrooted_optimistic_slots(&bank3, &blockstore) + .is_empty()); + // 3 is bigger than only slot 1, so slot 5 should be left over + assert_eq!(optimistic_confirmation_verifier.unchecked_slots.len(), 1); + assert!(optimistic_confirmation_verifier + .unchecked_slots + .contains(&optimistic_slots[2])); + + // If root is on different fork, the slots < root on different fork should + // be returned + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(optimistic_slots.clone()); + let bank4 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(4) + .cloned() + .unwrap(); + assert_eq!( + optimistic_confirmation_verifier.get_unrooted_optimistic_slots(&bank4, &blockstore), + vec![optimistic_slots[1]] + ); + // 4 is bigger than only slots 1 and 3, so slot 5 should be left over + assert_eq!(optimistic_confirmation_verifier.unchecked_slots.len(), 1); + assert!(optimistic_confirmation_verifier + .unchecked_slots + .contains(&optimistic_slots[2])); + + // Now set a root at slot 5, purging BankForks of slots < 5 + vote_simulator.set_root(5); + + // Add a new bank 7 that descends from 6 + let bank6 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(6) + .cloned() + .unwrap(); + vote_simulator + .bank_forks + .write() + .unwrap() + .insert(Bank::new_from_parent(&bank6, &Pubkey::default(), 7)); + let bank7 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(7) + .unwrap() + .clone(); + assert!(!bank7.ancestors.contains_key(&3)); + + // Should return slots 1, 3 as part of the rooted fork because there's no + // ancestry information + optimistic_confirmation_verifier + .add_new_optimistic_confirmed_slots(optimistic_slots.clone()); + assert_eq!( + optimistic_confirmation_verifier.get_unrooted_optimistic_slots(&bank7, &blockstore), + optimistic_slots[0..=1].to_vec() + ); + assert!(optimistic_confirmation_verifier.unchecked_slots.is_empty()); + + // If we know set the root in blockstore, should return nothing + blockstore.set_roots(&[1, 3]).unwrap(); + optimistic_confirmation_verifier.add_new_optimistic_confirmed_slots(optimistic_slots); + assert!(optimistic_confirmation_verifier + .get_unrooted_optimistic_slots(&bank7, &blockstore) + .is_empty()); + assert!(optimistic_confirmation_verifier.unchecked_slots.is_empty()); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + fn setup_forks() -> VoteSimulator { + /* + Build fork structure: + slot 0 + | + slot 1 + / \ + slot 2 | + | slot 3 + slot 4 | + slot 5 + | + slot 6 + */ + let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5) / (tr(6))))); + + let mut vote_simulator = VoteSimulator::new(1); + vote_simulator.fill_bank_forks(forks, &HashMap::new()); + vote_simulator + } +} diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 45368e5745..8c6aa0e61d 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -1197,6 +1197,7 @@ mod test { &keypairs.node_keypair, &keypairs.vote_keypair, &keypairs.vote_keypair, + None, ); bank9.process_transaction(&vote_tx).unwrap(); assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some()); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 1d1798109c..55c47a0404 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2706,6 +2706,7 @@ pub(crate) mod tests { &my_keypairs.node_keypair, &my_keypairs.vote_keypair, &my_keypairs.vote_keypair, + None, ); let bank_forks = RwLock::new(bank_forks); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 288aedda5d..7c911f6002 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -968,6 +968,7 @@ mod tests { node_keypair, vote_keypair, vote_keypair, + None, ); votes_sender.send(vec![vote_tx]).unwrap(); }); @@ -978,7 +979,7 @@ mod tests { ClusterInfoVoteListener::get_and_process_votes_for_tests( &votes_receiver, &vote_tracker, - 0, + &bank, &rpc.subscriptions, &s, &replay_votes_receiver, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 5d3d4b5d0a..0e4ffb0e3a 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -81,6 +81,7 @@ impl Tpu { subscriptions.clone(), verified_vote_sender, replay_votes_receiver, + blockstore.clone(), ); let banking_stage = BankingStage::new( diff --git a/core/src/vote_stake_tracker.rs b/core/src/vote_stake_tracker.rs new file mode 100644 index 0000000000..ac1b20c68c --- /dev/null +++ b/core/src/vote_stake_tracker.rs @@ -0,0 +1,68 @@ +use solana_runtime::commitment::VOTE_THRESHOLD_SIZE; +use solana_sdk::pubkey::Pubkey; +use std::{collections::HashSet, sync::Arc}; + +#[derive(Default)] +pub struct VoteStakeTracker { + voted: HashSet>, + stake: u64, +} + +impl VoteStakeTracker { + // Returns true if the stake that has voted has just crosssed the supermajority + // of stake + pub fn add_vote_pubkey( + &mut self, + vote_pubkey: Arc, + stake: u64, + total_epoch_stake: u64, + ) -> bool { + if !self.voted.contains(&vote_pubkey) { + self.voted.insert(vote_pubkey); + let ratio_before = self.stake as f64 / total_epoch_stake as f64; + self.stake += stake; + let ratio_now = self.stake as f64 / total_epoch_stake as f64; + + ratio_before <= VOTE_THRESHOLD_SIZE && ratio_now > VOTE_THRESHOLD_SIZE + } else { + false + } + } + + pub fn voted(&self) -> &HashSet> { + &self.voted + } + + pub fn stake(&self) -> u64 { + self.stake + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_add_vote_pubkey() { + let total_epoch_stake = 10; + let mut vote_stake_tracker = VoteStakeTracker::default(); + for i in 0..10 { + let pubkey = Arc::new(Pubkey::new_rand()); + let res = vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake); + let stake = vote_stake_tracker.stake(); + vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake); + let stake2 = vote_stake_tracker.stake(); + + // Stake should not change from adding same pubkey twice + assert_eq!(stake, stake2); + + // at i == 7, the voted stake is 70%, which is the first time crossing + // the supermajority threshold + if i == 6 { + assert!(res); + } else { + assert!(!res); + } + } + } +} diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index 191d9488a2..8d314541b1 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -171,7 +171,7 @@ mod tests { } fn emit_stats( - time_initial: &Instant, + time_initial: Instant, time_previous: &mut Instant, storage_previous: &mut u64, start_slot: u64, @@ -187,7 +187,7 @@ mod tests { println!( "{},{},{},{},{},{},{},{},{},{},{}", - time_now.duration_since(*time_initial).as_millis(), + time_now.duration_since(time_initial).as_millis(), time_now.duration_since(*time_previous).as_millis(), start_slot, batch_size, @@ -249,7 +249,7 @@ mod tests { emit_header(); emit_stats( - &time_initial, + time_initial, &mut time_previous, &mut storage_previous, 0, @@ -273,7 +273,7 @@ mod tests { sender.send(x).unwrap(); emit_stats( - &time_initial, + time_initial, &mut time_previous, &mut storage_previous, x, @@ -303,7 +303,7 @@ mod tests { sender.send(benchmark_slots).unwrap(); emit_stats( - &time_initial, + time_initial, &mut time_previous, &mut storage_previous, benchmark_slots, @@ -324,7 +324,7 @@ mod tests { } emit_stats( - &time_initial, + time_initial, &mut time_previous, &mut storage_previous, benchmark_slots, diff --git a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json index 129cf92ffc..bc9569eaa6 100644 --- a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json +++ b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json @@ -6206,7 +6206,7 @@ "x": 16, "y": 48 }, - "id": 73, + "id": 40, "legend": { "alignAsTable": false, "avg": false, @@ -7120,7 +7120,7 @@ "x": 16, "y": 54 }, - "id": 40, + "id": 43, "legend": { "alignAsTable": false, "avg": false, @@ -7707,7 +7707,7 @@ "x": 16, "y": 60 }, - "id": 43, + "id": 46, "legend": { "alignAsTable": false, "avg": false, @@ -8495,7 +8495,7 @@ "x": 16, "y": 65 }, - "id": 46, + "id": 49, "legend": { "alignAsTable": false, "avg": false, @@ -8613,7 +8613,7 @@ "x": 16, "y": 71 }, - "id": 49, + "id": 50, "legend": { "alignAsTable": false, "avg": false, @@ -8885,7 +8885,7 @@ "x": 0, "y": 77 }, - "id": 50, + "id": 51, "panels": [], "title": "Tower Consensus", "type": "row" @@ -8908,7 +8908,7 @@ "x": 0, "y": 78 }, - "id": 51, + "id": 52, "legend": { "alignAsTable": false, "avg": false, @@ -9068,7 +9068,7 @@ "x": 8, "y": 78 }, - "id": 52, + "id": 53, "legend": { "alignAsTable": false, "avg": false, @@ -9228,7 +9228,7 @@ "x": 16, "y": 78 }, - "id": 53, + "id": 54, "legend": { "alignAsTable": false, "avg": false, @@ -9405,15 +9405,384 @@ "alignLevel": null } }, + { + "aliasColors": { + "cluster-info.repair": "#ba43a9", + "replay_stage-new_leader.last": "#00ffbb", + "tower-vote.last": "#00ffbb", + "window-service.receive": "#b7dbab", + "window-stage.consumed": "#5195ce" + }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 1, + "gridPos": { + "h": 5, + "w": 8, + "x": 0, + "y": 83 + }, + "id": 74, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": false, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT last(\"slot\") FROM \"$testnet\".\"autogen\".\"optimistic_slot\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Optimistic Slots ($hostid)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": { + "cluster-info.repair": "#ba43a9", + "replay_stage-new_leader.last": "#00ffbb", + "tower-vote.last": "#00ffbb", + "window-service.receive": "#b7dbab", + "window-stage.consumed": "#5195ce" + }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 1, + "gridPos": { + "h": 5, + "w": 8, + "x": 8, + "y": 83 + }, + "id": 75, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": false, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT last(\"slot\") FROM \"$testnet\".\"autogen\".\"optimistic_slot_not_rooted\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Unrooted Optimistic Slots ($hostid)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": { + "cluster-info.repair": "#ba43a9", + "replay_stage-new_leader.last": "#00ffbb", + "tower-vote.last": "#00ffbb", + "window-service.receive": "#b7dbab", + "window-stage.consumed": "#5195ce" + }, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$datasource", + "fill": 1, + "gridPos": { + "h": 5, + "w": 8, + "x": 16, + "y": 83 + }, + "id": 76, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": false, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT last(\"average_elapsed_ms\") FROM \"$testnet\".\"autogen\".\"optimistic_slot_elapsed\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "Average Time Between Optimistic Confirmations ($hostid)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "collapsed": false, "gridPos": { "h": 1, "w": 24, "x": 0, - "y": 83 + "y": 88 }, - "id": 54, + "id": 55, "panels": [], "repeat": null, "title": "IP Network", @@ -9430,9 +9799,9 @@ "h": 5, "w": 12, "x": 0, - "y": 84 + "y": 89 }, - "id": 55, + "id": 56, "legend": { "alignAsTable": false, "avg": false, @@ -9663,9 +10032,9 @@ "h": 5, "w": 12, "x": 12, - "y": 84 + "y": 89 }, - "id": 56, + "id": 57, "legend": { "alignAsTable": false, "avg": false, @@ -9816,9 +10185,9 @@ "h": 1, "w": 24, "x": 0, - "y": 89 + "y": 94 }, - "id": 57, + "id": 58, "panels": [], "title": "Signature Verification", "type": "row" @@ -9834,9 +10203,9 @@ "h": 5, "w": 12, "x": 0, - "y": 90 + "y": 95 }, - "id": 58, + "id": 59, "legend": { "avg": false, "current": false, @@ -10036,9 +10405,9 @@ "h": 5, "w": 12, "x": 12, - "y": 90 + "y": 95 }, - "id": 59, + "id": 60, "legend": { "alignAsTable": false, "avg": false, @@ -10185,9 +10554,9 @@ "h": 1, "w": 24, "x": 0, - "y": 95 + "y": 100 }, - "id": 60, + "id": 61, "panels": [], "title": "Snapshots", "type": "row" @@ -10203,9 +10572,9 @@ "h": 6, "w": 8, "x": 0, - "y": 96 + "y": 101 }, - "id": 61, + "id": 62, "legend": { "avg": false, "current": false, @@ -10395,9 +10764,9 @@ "h": 6, "w": 8, "x": 8, - "y": 96 + "y": 101 }, - "id": 62, + "id": 63, "legend": { "avg": false, "current": false, @@ -10663,9 +11032,9 @@ "h": 6, "w": 8, "x": 16, - "y": 96 + "y": 101 }, - "id": 63, + "id": 64, "legend": { "avg": false, "current": false, @@ -10857,9 +11226,9 @@ "h": 6, "w": 8, "x": 0, - "y": 102 + "y": 107 }, - "id": 64, + "id": 65, "legend": { "avg": false, "current": false, @@ -11048,9 +11417,9 @@ "h": 1, "w": 24, "x": 0, - "y": 108 + "y": 113 }, - "id": 65, + "id": 66, "panels": [], "title": "RPC Send Transaction Service", "type": "row" @@ -11066,9 +11435,9 @@ "h": 6, "w": 12, "x": 0, - "y": 109 + "y": 114 }, - "id": 66, + "id": 67, "legend": { "avg": false, "current": false, @@ -11184,9 +11553,9 @@ "h": 6, "w": 12, "x": 12, - "y": 109 + "y": 114 }, - "id": 67, + "id": 68, "legend": { "avg": false, "current": false, @@ -11447,9 +11816,9 @@ "h": 1, "w": 24, "x": 0, - "y": 115 + "y": 120 }, - "id": 68, + "id": 69, "panels": [], "title": "Bench TPS", "type": "row" @@ -11465,9 +11834,9 @@ "h": 5, "w": 7, "x": 0, - "y": 116 + "y": 121 }, - "id": 69, + "id": 70, "legend": { "avg": false, "current": false, @@ -11580,9 +11949,9 @@ "h": 5, "w": 7, "x": 7, - "y": 116 + "y": 121 }, - "id": 70, + "id": 71, "legend": { "alignAsTable": false, "avg": false, @@ -11805,9 +12174,9 @@ "h": 5, "w": 10, "x": 14, - "y": 116 + "y": 121 }, - "id": 71, + "id": 72, "links": [], "pageSize": null, "scroll": true, @@ -11893,9 +12262,9 @@ "h": 4, "w": 10, "x": 0, - "y": 121 + "y": 126 }, - "id": 72, + "id": 73, "legend": { "avg": false, "current": false, @@ -11998,7 +12367,7 @@ } } ], - "refresh": "1m", + "refresh": "60s", "schemaVersion": 16, "style": "dark", "tags": [], diff --git a/programs/vote/src/vote_transaction.rs b/programs/vote/src/vote_transaction.rs index 1fa92d9814..99c6ecd555 100644 --- a/programs/vote/src/vote_transaction.rs +++ b/programs/vote/src/vote_transaction.rs @@ -14,13 +14,23 @@ pub fn new_vote_transaction( node_keypair: &Keypair, vote_keypair: &Keypair, authorized_voter_keypair: &Keypair, + switch_proof_hash: Option, ) -> Transaction { let votes = Vote::new(slots, bank_hash); - let vote_ix = vote_instruction::vote( - &vote_keypair.pubkey(), - &authorized_voter_keypair.pubkey(), - votes, - ); + let vote_ix = if let Some(switch_proof_hash) = switch_proof_hash { + vote_instruction::vote_switch( + &vote_keypair.pubkey(), + &authorized_voter_keypair.pubkey(), + votes, + switch_proof_hash, + ) + } else { + vote_instruction::vote( + &vote_keypair.pubkey(), + &authorized_voter_keypair.pubkey(), + votes, + ) + }; let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey()));