From 0264147d42d506fb888f5c4c021a998e231a3e74 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 24 Oct 2020 10:19:12 +0900 Subject: [PATCH] Clean up opt conf verifier and vote state tracker (#13081) * Clean up opt conf verifier and vote state tracker * Update test to follow new message and some knob * Rename --- core/src/cluster_info_vote_listener.rs | 185 ++++++++++--------- core/src/optimistic_confirmation_verifier.rs | 29 +-- core/src/vote_stake_tracker.rs | 11 +- local-cluster/tests/local_cluster.rs | 20 +- 4 files changed, 133 insertions(+), 112 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index b41bc3b84..fe55f529f 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -27,7 +27,7 @@ use solana_runtime::{ vote_sender_types::{ReplayVoteReceiver, ReplayedVote}, }; use solana_sdk::{ - clock::{Epoch, Slot}, + clock::{Epoch, Slot, DEFAULT_MS_PER_SLOT}, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, @@ -98,7 +98,7 @@ impl VoteTracker { epoch_schedule: *root_bank.epoch_schedule(), ..VoteTracker::default() }; - vote_tracker.process_new_root_bank(&root_bank); + vote_tracker.progress_with_new_root_bank(&root_bank); assert_eq!( *vote_tracker.leader_schedule_epoch.read().unwrap(), root_bank.get_leader_schedule_epoch(root_bank.slot()) @@ -174,7 +174,7 @@ impl VoteTracker { self.keys.get_or_insert(&pubkey); } - fn update_leader_schedule_epoch(&self, root_bank: &Bank) { + fn progress_leader_schedule_epoch(&self, root_bank: &Bank) { // Update with any newly calculated epoch state about future epochs let start_leader_schedule_epoch = *self.leader_schedule_epoch.read().unwrap(); let mut greatest_leader_schedule_epoch = start_leader_schedule_epoch; @@ -205,7 +205,7 @@ impl VoteTracker { } } - fn update_new_root(&self, root_bank: &Bank) { + fn purge_stale_state(&self, root_bank: &Bank) { // Purge any outdated slot data let new_root = root_bank.slot(); let root_epoch = root_bank.epoch(); @@ -220,15 +220,15 @@ impl VoteTracker { self.epoch_authorized_voters .write() .unwrap() - .retain(|epoch, _| epoch >= &root_epoch); + .retain(|epoch, _| *epoch >= root_epoch); self.keys.purge(); *self.current_epoch.write().unwrap() = root_epoch; } } - fn process_new_root_bank(&self, root_bank: &Bank) { - self.update_leader_schedule_epoch(root_bank); - self.update_new_root(root_bank); + fn progress_with_new_root_bank(&self, root_bank: &Bank) { + self.progress_leader_schedule_epoch(root_bank); + self.purge_stale_state(root_bank); } } @@ -425,7 +425,7 @@ impl ClusterInfoVoteListener { blockstore: Arc, bank_notification_sender: Option, ) -> Result<()> { - let mut optimistic_confirmation_verifier = + let mut confirmation_verifier = OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root()); let mut last_process_root = Instant::now(); loop { @@ -434,21 +434,21 @@ impl ClusterInfoVoteListener { } let root_bank = bank_forks.read().unwrap().root_bank().clone(); - if last_process_root.elapsed().as_millis() > 400 { - let unrooted_optimistic_slots = optimistic_confirmation_verifier - .get_unrooted_optimistic_slots(&root_bank, &blockstore); + if last_process_root.elapsed().as_millis() > DEFAULT_MS_PER_SLOT as u128 { + let unrooted_optimistic_slots = confirmation_verifier + .verify_for_unrooted_optimistic_slots(&root_bank, &blockstore); // SlotVoteTracker's for all `slots` in `unrooted_optimistic_slots` // should still be available because we haven't purged in - // `process_new_root_bank()` yet, which is called below + // `progress_with_new_root_bank()` yet, which is called below OptimisticConfirmationVerifier::log_unrooted_optimistic_slots( &root_bank, &vote_tracker, &unrooted_optimistic_slots, ); - vote_tracker.process_new_root_bank(&root_bank); + vote_tracker.progress_with_new_root_bank(&root_bank); last_process_root = Instant::now(); } - let optimistic_confirmed_slots = Self::get_and_process_votes( + let confirmed_slots = Self::listen_and_confirm_votes( &gossip_vote_txs_receiver, &vote_tracker, &root_bank, @@ -457,19 +457,17 @@ impl ClusterInfoVoteListener { &replay_votes_receiver, &bank_notification_sender, ); - - if let Err(e) = optimistic_confirmed_slots { - match e { + match confirmed_slots { + Ok(confirmed_slots) => { + confirmation_verifier.add_new_optimistic_confirmed_slots(confirmed_slots); + } + Err(e) => match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) | Error::ReadyTimeoutError => (), _ => { error!("thread {:?} error {:?}", thread::current().name(), e); } - } - } else { - let optimistic_confirmed_slots = optimistic_confirmed_slots.unwrap(); - optimistic_confirmation_verifier - .add_new_optimistic_confirmed_slots(optimistic_confirmed_slots); + }, } } } @@ -483,7 +481,7 @@ impl ClusterInfoVoteListener { verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVoteReceiver, ) -> Result> { - Self::get_and_process_votes( + Self::listen_and_confirm_votes( gossip_vote_txs_receiver, vote_tracker, root_bank, @@ -494,7 +492,7 @@ impl ClusterInfoVoteListener { ) } - fn get_and_process_votes( + fn listen_and_confirm_votes( gossip_vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &VoteTracker, root_bank: &Bank, @@ -523,7 +521,7 @@ impl ClusterInfoVoteListener { let gossip_vote_txs: Vec<_> = gossip_vote_txs_receiver.try_iter().flatten().collect(); let replay_votes: Vec<_> = replay_votes_receiver.try_iter().collect(); if !gossip_vote_txs.is_empty() || !replay_votes.is_empty() { - return Ok(Self::process_votes( + return Ok(Self::filter_and_confirm_with_new_votes( vote_tracker, gossip_vote_txs, replay_votes, @@ -541,7 +539,7 @@ impl ClusterInfoVoteListener { } #[allow(clippy::too_many_arguments)] - fn update_new_votes( + fn track_new_votes_and_notify_confirmations( vote: Vote, vote_pubkey: &Pubkey, vote_tracker: &VoteTracker, @@ -557,56 +555,52 @@ impl ClusterInfoVoteListener { return; } - let last_vote_slot = vote.slots.last().unwrap(); + let last_vote_slot = *vote.slots.last().unwrap(); + let last_vote_hash = vote.hash; let root = root_bank.slot(); - let last_vote_hash = vote.hash; let mut is_new_vote = false; - for slot in vote.slots.iter().rev() { - // If slot is before the root, or so far ahead we don't have - // stake information, then ignore it - let epoch = root_bank.epoch_schedule().get_epoch(*slot); + // If slot is before the root, ignore it + for slot in vote.slots.iter().filter(|slot| **slot > root).rev() { + let slot = *slot; + + // if we don't have stake information, ignore it + let epoch = root_bank.epoch_schedule().get_epoch(slot); let epoch_stakes = root_bank.epoch_stakes(epoch); - if *slot <= root || epoch_stakes.is_none() { + if epoch_stakes.is_none() { continue; } let epoch_stakes = epoch_stakes.unwrap(); - let epoch_vote_accounts = Stakes::vote_accounts(epoch_stakes.stakes()); - let total_epoch_stake = epoch_stakes.total_stake(); let unduplicated_pubkey = vote_tracker.keys.get_or_insert(&vote_pubkey); // The last vote slot, which is the greatest slot in the stack // of votes in a vote transaction, qualifies for optimistic confirmation. - let update_optimistic_confirmation_info = if slot == last_vote_slot { - let stake = epoch_vote_accounts + if slot == last_vote_slot { + let vote_accounts = Stakes::vote_accounts(epoch_stakes.stakes()); + let stake = vote_accounts .get(&vote_pubkey) .map(|(stake, _)| *stake) - .unwrap_or(0); - Some((stake, last_vote_hash)) - } else { - None - }; + .unwrap_or_default(); + let total_stake = epoch_stakes.total_stake(); - // If this vote for this slot qualifies for optimistic confirmation - if let Some((stake, hash)) = update_optimistic_confirmation_info { // Fast track processing of the last slot in a vote transactions // so that notifications for optimistic confirmation can be sent // as soon as possible. - let (is_confirmed, is_new) = Self::add_optimistic_confirmation_vote( + let (is_confirmed, is_new) = Self::track_optimistic_confirmation_vote( vote_tracker, - *slot, - hash, + last_vote_slot, + last_vote_hash, unduplicated_pubkey.clone(), stake, - total_epoch_stake, + total_stake, ); if is_confirmed { - new_optimistic_confirmed_slots.push((*slot, last_vote_hash)); + new_optimistic_confirmed_slots.push((last_vote_slot, last_vote_hash)); // Notify subscribers about new optimistic confirmation if let Some(sender) = bank_notification_sender { sender - .send(BankNotification::OptimisticallyConfirmed(*slot)) + .send(BankNotification::OptimisticallyConfirmed(last_vote_slot)) .unwrap_or_else(|err| { warn!("bank_notification_sender failed: {:?}", err) }); @@ -617,7 +611,7 @@ impl ClusterInfoVoteListener { // By now: // 1) The vote must have come from ReplayStage, // 2) We've seen this vote from replay for this hash before - // (`add_optimistic_confirmation_vote()` will not set `is_new == true` + // (`track_optimistic_confirmation_vote()` will not set `is_new == true` // for same slot different hash), so short circuit because this vote // has no new information @@ -629,7 +623,7 @@ impl ClusterInfoVoteListener { is_new_vote = is_new; } - diff.entry(*slot) + diff.entry(slot) .or_default() .entry(unduplicated_pubkey) .and_modify(|seen_in_gossip_previously| { @@ -644,7 +638,40 @@ impl ClusterInfoVoteListener { } } - fn process_votes( + fn filter_gossip_votes( + vote_tracker: &VoteTracker, + vote_pubkey: &Pubkey, + vote: &Vote, + gossip_tx: &Transaction, + ) -> bool { + if vote.slots.is_empty() { + return false; + } + let last_vote_slot = vote.slots.last().unwrap(); + // Votes from gossip need to be verified as they have not been + // verified by the replay pipeline. Determine the authorized voter + // based on the last vote slot. This will drop votes from authorized + // voters trying to make votes for slots earlier than the epoch for + // which they are authorized + let actual_authorized_voter = + vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); + + if actual_authorized_voter.is_none() { + return false; + } + + // Voting without the correct authorized pubkey, dump the vote + if !VoteTracker::vote_contains_authorized_voter( + &gossip_tx, + &actual_authorized_voter.unwrap(), + ) { + return false; + } + + true + } + + fn filter_and_confirm_with_new_votes( vote_tracker: &VoteTracker, gossip_vote_txs: Vec, replayed_votes: Vec, @@ -662,37 +689,13 @@ impl ClusterInfoVoteListener { .filter_map(|gossip_tx| { vote_transaction::parse_vote_transaction(gossip_tx) .filter(|(vote_pubkey, vote, _)| { - if vote.slots.is_empty() { - return false; - } - let last_vote_slot = vote.slots.last().unwrap(); - // Votes from gossip need to be verified as they have not been - // verified by the replay pipeline. Determine the authorized voter - // based on the last vote slot. This will drop votes from authorized - // voters trying to make votes for slots earlier than the epoch for - // which they are authorized - let actual_authorized_voter = - vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); - - if actual_authorized_voter.is_none() { - return false; - } - - // Voting without the correct authorized pubkey, dump the vote - if !VoteTracker::vote_contains_authorized_voter( - &gossip_tx, - &actual_authorized_voter.unwrap(), - ) { - return false; - } - - true + Self::filter_gossip_votes(vote_tracker, vote_pubkey, vote, gossip_tx) }) .map(|v| (true, v)) }) .chain(replayed_votes.into_iter().map(|v| (false, v))) { - Self::update_new_votes( + Self::track_new_votes_and_notify_confirmations( vote, &vote_pubkey, &vote_tracker, @@ -757,7 +760,7 @@ impl ClusterInfoVoteListener { // Returns if the slot was optimistically confirmed, and whether // the slot was new - fn add_optimistic_confirmation_vote( + fn track_optimistic_confirmation_vote( vote_tracker: &VoteTracker, slot: Slot, hash: Hash, @@ -909,7 +912,7 @@ mod tests { .unwrap() .contains_key(&bank.slot())); let bank1 = Bank::new_from_parent(&bank, &Pubkey::default(), bank.slot() + 1); - vote_tracker.process_new_root_bank(&bank1); + vote_tracker.progress_with_new_root_bank(&bank1); assert!(!vote_tracker .slot_vote_trackers .read() @@ -926,7 +929,7 @@ mod tests { bank.epoch_schedule() .get_first_slot_in_epoch(current_epoch + 1), ); - vote_tracker.process_new_root_bank(&new_epoch_bank); + vote_tracker.progress_with_new_root_bank(&new_epoch_bank); assert!(!vote_tracker.keys.0.read().unwrap().contains(&new_voter)); assert_eq!( *vote_tracker.current_epoch.read().unwrap(), @@ -956,7 +959,7 @@ mod tests { ); let next_leader_schedule_bank = Bank::new_from_parent(&bank, &Pubkey::default(), next_leader_schedule_computed); - vote_tracker.update_leader_schedule_epoch(&next_leader_schedule_bank); + vote_tracker.progress_leader_schedule_epoch(&next_leader_schedule_bank); assert_eq!( *vote_tracker.leader_schedule_epoch.read().unwrap(), next_leader_schedule_epoch @@ -1007,7 +1010,7 @@ mod tests { &votes_sender, &replay_votes_sender, ); - ClusterInfoVoteListener::get_and_process_votes( + ClusterInfoVoteListener::listen_and_confirm_votes( &votes_receiver, &vote_tracker, &bank3, @@ -1036,7 +1039,7 @@ mod tests { &votes_sender, &replay_votes_sender, ); - ClusterInfoVoteListener::get_and_process_votes( + ClusterInfoVoteListener::listen_and_confirm_votes( &votes_receiver, &vote_tracker, &bank3, @@ -1114,7 +1117,7 @@ mod tests { ); // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::get_and_process_votes( + ClusterInfoVoteListener::listen_and_confirm_votes( &votes_txs_receiver, &vote_tracker, &bank0, @@ -1233,7 +1236,7 @@ mod tests { } // Read and process votes from channel `votes_receiver` - ClusterInfoVoteListener::get_and_process_votes( + ClusterInfoVoteListener::listen_and_confirm_votes( &votes_txs_receiver, &vote_tracker, &bank0, @@ -1328,7 +1331,7 @@ mod tests { )) .unwrap(); } - let _ = ClusterInfoVoteListener::get_and_process_votes( + let _ = ClusterInfoVoteListener::listen_and_confirm_votes( &votes_receiver, &vote_tracker, &bank, @@ -1474,7 +1477,7 @@ mod tests { )]; let (verified_vote_sender, _verified_vote_receiver) = unbounded(); - ClusterInfoVoteListener::process_votes( + ClusterInfoVoteListener::filter_and_confirm_with_new_votes( &vote_tracker, vote_tx, // Add gossip vote for same slot, should not affect outcome @@ -1545,7 +1548,7 @@ mod tests { let new_root_bank = Bank::new_from_parent(&bank, &Pubkey::default(), first_slot_in_new_epoch - 2); - ClusterInfoVoteListener::process_votes( + ClusterInfoVoteListener::filter_and_confirm_with_new_votes( &vote_tracker, vote_txs, vec![( diff --git a/core/src/optimistic_confirmation_verifier.rs b/core/src/optimistic_confirmation_verifier.rs index 59cfea28e..6746da694 100644 --- a/core/src/optimistic_confirmation_verifier.rs +++ b/core/src/optimistic_confirmation_verifier.rs @@ -20,7 +20,7 @@ impl OptimisticConfirmationVerifier { } // Returns any optimistic slots that were not rooted - pub fn get_unrooted_optimistic_slots( + pub fn verify_for_unrooted_optimistic_slots( &mut self, root_bank: &Bank, blockstore: &Blockstore, @@ -34,8 +34,8 @@ impl OptimisticConfirmationVerifier { std::mem::swap(&mut slots_before_root, &mut self.unchecked_slots); slots_before_root .into_iter() - .filter(|(optimistic_slot, hash)| { - (*optimistic_slot == root && *hash != root_bank.hash()) + .filter(|(optimistic_slot, optimistic_hash)| { + (*optimistic_slot == root && *optimistic_hash != root_bank.hash()) || (!root_ancestors.contains_key(&optimistic_slot) && // In this second part of the `and`, we account for the possibility that // there was some other root `rootX` set in BankForks where: @@ -76,6 +76,10 @@ impl OptimisticConfirmationVerifier { self.last_optimistic_slot_ts = Instant::now(); } + pub fn format_optimistic_confirmd_slot_violation_log(slot: Slot) -> String { + format!("Optimistically confirmed slot {} was not rooted", slot) + } + pub fn log_unrooted_optimistic_slots( root_bank: &Bank, vote_tracker: &VoteTracker, @@ -96,7 +100,7 @@ impl OptimisticConfirmationVerifier { .unwrap_or(0); error!( - "Optimistic slot {} was not rooted, + "{}, hash: {}, epoch: {}, voted keys: {:?}, @@ -105,7 +109,7 @@ impl OptimisticConfirmationVerifier { voted stake: {}, total epoch stake: {}, pct: {}", - optimistic_slot, + Self::format_optimistic_confirmd_slot_violation_log(*optimistic_slot), hash, epoch, r_slot_tracker @@ -181,7 +185,8 @@ mod test { .cloned() .unwrap(); assert_eq!( - optimistic_confirmation_verifier.get_unrooted_optimistic_slots(&bank1, &blockstore), + optimistic_confirmation_verifier + .verify_for_unrooted_optimistic_slots(&bank1, &blockstore), vec![(1, bad_bank_hash)] ); assert_eq!(optimistic_confirmation_verifier.unchecked_slots.len(), 1); @@ -228,7 +233,7 @@ mod test { .cloned() .unwrap(); assert!(optimistic_confirmation_verifier - .get_unrooted_optimistic_slots(&bank5, &blockstore) + .verify_for_unrooted_optimistic_slots(&bank5, &blockstore) .is_empty()); // 5 is >= than all the unchecked slots, so should clear everything assert!(optimistic_confirmation_verifier.unchecked_slots.is_empty()); @@ -244,7 +249,7 @@ mod test { .cloned() .unwrap(); assert!(optimistic_confirmation_verifier - .get_unrooted_optimistic_slots(&bank3, &blockstore) + .verify_for_unrooted_optimistic_slots(&bank3, &blockstore) .is_empty()); // 3 is bigger than only slot 1, so slot 5 should be left over assert_eq!(optimistic_confirmation_verifier.unchecked_slots.len(), 1); @@ -264,7 +269,8 @@ mod test { .cloned() .unwrap(); assert_eq!( - optimistic_confirmation_verifier.get_unrooted_optimistic_slots(&bank4, &blockstore), + optimistic_confirmation_verifier + .verify_for_unrooted_optimistic_slots(&bank4, &blockstore), vec![optimistic_slots[1]] ); // 4 is bigger than only slots 1 and 3, so slot 5 should be left over @@ -303,7 +309,8 @@ mod test { optimistic_confirmation_verifier .add_new_optimistic_confirmed_slots(optimistic_slots.clone()); assert_eq!( - optimistic_confirmation_verifier.get_unrooted_optimistic_slots(&bank7, &blockstore), + optimistic_confirmation_verifier + .verify_for_unrooted_optimistic_slots(&bank7, &blockstore), optimistic_slots[0..=1].to_vec() ); assert!(optimistic_confirmation_verifier.unchecked_slots.is_empty()); @@ -312,7 +319,7 @@ mod test { blockstore.set_roots(&[1, 3]).unwrap(); optimistic_confirmation_verifier.add_new_optimistic_confirmed_slots(optimistic_slots); assert!(optimistic_confirmation_verifier - .get_unrooted_optimistic_slots(&bank7, &blockstore) + .verify_for_unrooted_optimistic_slots(&bank7, &blockstore) .is_empty()); assert!(optimistic_confirmation_verifier.unchecked_slots.is_empty()); } diff --git a/core/src/vote_stake_tracker.rs b/core/src/vote_stake_tracker.rs index a5c051ac0..654da9167 100644 --- a/core/src/vote_stake_tracker.rs +++ b/core/src/vote_stake_tracker.rs @@ -17,16 +17,17 @@ impl VoteStakeTracker { &mut self, vote_pubkey: Arc, stake: u64, - total_epoch_stake: u64, + total_stake: u64, ) -> (bool, bool) { let is_new = !self.voted.contains(&vote_pubkey); if is_new { self.voted.insert(vote_pubkey); - let supermajority_stake = (total_epoch_stake as f64 * VOTE_THRESHOLD_SIZE) as u64; - let previous_stake = self.stake; - self.stake += stake; + let supermajority_stake = (total_stake as f64 * VOTE_THRESHOLD_SIZE) as u64; + let old_stake = self.stake; + let new_stake = self.stake + stake; + self.stake = new_stake; ( - previous_stake <= supermajority_stake && self.stake > supermajority_stake, + old_stake <= supermajority_stake && supermajority_stake < new_stake, is_new, ) } else { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index a214c636a..3a4dc8d50 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -11,6 +11,7 @@ use solana_core::{ cluster_info::VALIDATOR_PORT_RANGE, consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, gossip_service::discover_cluster, + optimistic_confirmation_verifier::OptimisticConfirmationVerifier, validator::ValidatorConfig, }; use solana_download_utils::download_snapshot; @@ -1380,7 +1381,9 @@ fn test_no_voting() { #[serial] fn test_optimistic_confirmation_violation_detection() { solana_logger::setup(); - let mut buf = BufferRedirect::stderr().unwrap(); + let buf = std::env::var("OPTIMISTIC_CONF_TEST_DUMP_LOG") + .err() + .map(|_| BufferRedirect::stderr().unwrap()); // First set up the cluster with 2 nodes let slots_per_epoch = 2048; let node_stakes = vec![51, 50]; @@ -1467,10 +1470,17 @@ fn test_optimistic_confirmation_violation_detection() { // Check to see that validator detected optimistic confirmation for // `prev_voted_slot` failed - let expected_log = format!("Optimistic slot {} was not rooted", prev_voted_slot); - let mut output = String::new(); - buf.read_to_string(&mut output).unwrap(); - assert!(output.contains(&expected_log)); + let expected_log = + OptimisticConfirmationVerifier::format_optimistic_confirmd_slot_violation_log( + prev_voted_slot, + ); + if let Some(mut buf) = buf { + let mut output = String::new(); + buf.read_to_string(&mut output).unwrap(); + assert!(output.contains(&expected_log)); + } else { + panic!("dumped log and disaled testing"); + } } #[test]