diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 98a31b7b0..12641262d 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -391,10 +391,16 @@ impl ClusterInfoVoteListener { .read() .unwrap() .would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT); + let feature_set = poh_recorder + .read() + .unwrap() + .bank() + .map(|bank| bank.feature_set.clone()); if let Err(e) = verified_vote_packets.receive_and_process_vote_packets( &verified_vote_label_packets_receiver, would_be_leader, + feature_set, ) { match e { Error::RecvTimeout(RecvTimeoutError::Disconnected) diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 4bac5d9ad..709db89d9 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -1,10 +1,19 @@ use { crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result}, solana_perf::packet::PacketBatch, - solana_runtime::{bank::Bank, vote_transaction::VoteTransaction}, + solana_runtime::{ + bank::Bank, + vote_transaction::{VoteTransaction, VoteTransaction::VoteStateUpdate}, + }, solana_sdk::{ - account::from_account, clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature, - slot_hashes::SlotHashes, sysvar, + account::from_account, + clock::Slot, + feature_set::{allow_votes_to_directly_update_vote_state, FeatureSet}, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + slot_hashes::SlotHashes, + sysvar, }, std::{ collections::{BTreeMap, HashMap, HashSet}, @@ -61,6 +70,32 @@ impl<'a> ValidatorGossipVotesIterator<'a> { previously_sent_to_bank_votes, } } + + fn filter_vote( + &mut self, + slot: &Slot, + hash: &Hash, + packet: &PacketBatch, + tx_signature: &Signature, + ) -> Option { + // Don't send the same vote to the same bank multiple times + if self.previously_sent_to_bank_votes.contains(tx_signature) { + return None; + } + self.previously_sent_to_bank_votes.insert(*tx_signature); + // Filter out votes on the wrong fork (or too old to be) + // on this fork + if self + .slot_hashes + .get(slot) + .map(|found_hash| found_hash == hash) + .unwrap_or(false) + { + Some(packet.clone()) + } else { + None + } + } } /// Each iteration returns all of the missing votes for a single validator, the votes @@ -71,6 +106,7 @@ impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { type Item = Vec; fn next(&mut self) -> Option { + use SingleValidatorVotes::*; // TODO: Maybe prioritize by stake weight while !self.vote_account_keys.is_empty() { let vote_account_key = self.vote_account_keys.pop().unwrap(); @@ -91,30 +127,25 @@ impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { vote_account.vote_state().as_ref().ok().map(|vote_state| { let start_vote_slot = vote_state.last_voted_slot().map(|x| x + 1).unwrap_or(0); - // Filter out the votes that are outdated - validator_gossip_votes - .range((start_vote_slot, Hash::default())..) - .filter_map(|((slot, hash), (packet, tx_signature))| { - if self.previously_sent_to_bank_votes.contains(tx_signature) - { - return None; - } - // Don't send the same vote to the same bank multiple times - self.previously_sent_to_bank_votes.insert(*tx_signature); - // Filter out votes on the wrong fork (or too old to be) - // on this fork - if self - .slot_hashes - .get(slot) - .map(|found_hash| found_hash == hash) - .unwrap_or(false) - { - Some(packet.clone()) - } else { - None - } - }) - .collect::>() + match validator_gossip_votes { + FullTowerVote(GossipVote { + slot, + hash, + packet_batch, + signature, + }) => self + .filter_vote(slot, hash, packet_batch, signature) + .map(|packet| vec![packet]) + .unwrap_or_default(), + IncrementalVotes(validator_gossip_votes) => { + validator_gossip_votes + .range((start_vote_slot, Hash::default())..) + .filter_map(|((slot, hash), (packet, tx_signature))| { + self.filter_vote(slot, hash, packet, tx_signature) + }) + .collect::>() + } + } }) }) }); @@ -128,7 +159,35 @@ impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { } } -pub type SingleValidatorVotes = BTreeMap<(Slot, Hash), (PacketBatch, Signature)>; +#[derive(Debug, Default, Clone)] +pub struct GossipVote { + slot: Slot, + hash: Hash, + packet_batch: PacketBatch, + signature: Signature, +} + +pub enum SingleValidatorVotes { + FullTowerVote(GossipVote), + IncrementalVotes(BTreeMap<(Slot, Hash), (PacketBatch, Signature)>), +} + +impl SingleValidatorVotes { + fn get_latest_gossip_slot(&self) -> Slot { + match self { + Self::FullTowerVote(vote) => vote.slot, + _ => 0, + } + } + + #[cfg(test)] + fn len(&self) -> usize { + match self { + Self::IncrementalVotes(votes) => votes.len(), + _ => 1, + } + } +} #[derive(Default)] pub struct VerifiedVotePackets(HashMap); @@ -138,10 +197,18 @@ impl VerifiedVotePackets { &mut self, vote_packets_receiver: &VerifiedLabelVotePacketsReceiver, would_be_leader: bool, + feature_set: Option>, ) -> Result<()> { + use SingleValidatorVotes::*; const RECV_TIMEOUT: Duration = Duration::from_millis(200); let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?; let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter()); + let mut is_full_tower_vote_enabled = false; + if let Some(feature_set) = feature_set { + is_full_tower_vote_enabled = + feature_set.is_active(&allow_votes_to_directly_update_vote_state::id()); + } + for gossip_votes in vote_packets { if would_be_leader { for verfied_vote_metadata in gossip_votes { @@ -158,12 +225,64 @@ impl VerifiedVotePackets { let slot = vote.last_voted_slot().unwrap(); let hash = vote.hash(); - let validator_votes = self.0.entry(vote_account_key).or_default(); - validator_votes.insert((slot, hash), (packet_batch, signature)); - - if validator_votes.len() > MAX_VOTES_PER_VALIDATOR { - let smallest_key = validator_votes.keys().next().cloned().unwrap(); - validator_votes.remove(&smallest_key).unwrap(); + match (vote, is_full_tower_vote_enabled) { + (VoteStateUpdate(_), true) => { + let latest_gossip_slot = match self.0.get(&vote_account_key) { + Some(vote) => vote.get_latest_gossip_slot(), + _ => 0, + }; + // Since votes are not incremental, we keep only the latest vote + if slot > latest_gossip_slot { + self.0.insert( + vote_account_key, + FullTowerVote(GossipVote { + slot, + hash, + packet_batch, + signature, + }), + ); + } + } + _ => { + if let Some(FullTowerVote(gossip_vote)) = + self.0.get_mut(&vote_account_key) + { + if slot > gossip_vote.slot && is_full_tower_vote_enabled { + warn!( + "Originally {} submitted full tower votes, but now has reverted to incremental votes. Converting back to old format.", + vote_account_key + ); + let mut votes = BTreeMap::new(); + let GossipVote { + slot, + hash, + packet_batch, + signature, + } = std::mem::take(gossip_vote); + votes.insert((slot, hash), (packet_batch, signature)); + self.0.insert(vote_account_key, IncrementalVotes(votes)); + } else { + continue; + } + }; + let validator_votes: &mut BTreeMap< + (Slot, Hash), + (PacketBatch, Signature), + > = match self + .0 + .entry(vote_account_key) + .or_insert(IncrementalVotes(BTreeMap::new())) + { + IncrementalVotes(votes) => votes, + FullTowerVote(_) => continue, // Should never happen + }; + validator_votes.insert((slot, hash), (packet_batch, signature)); + if validator_votes.len() > MAX_VOTES_PER_VALIDATOR { + let smallest_key = validator_votes.keys().next().cloned().unwrap(); + validator_votes.remove(&smallest_key).unwrap(); + } + } } } } @@ -175,12 +294,13 @@ impl VerifiedVotePackets { #[cfg(test)] mod tests { use { - super::*, + super::{SingleValidatorVotes::*, *}, crate::{result::Error, vote_simulator::VoteSimulator}, crossbeam_channel::unbounded, solana_perf::packet::Packet, solana_sdk::slot_hashes::MAX_ENTRIES, - solana_vote_program::vote_state::Vote, + solana_vote_program::vote_state::{Lockout, Vote, VoteStateUpdate}, + std::collections::VecDeque, }; #[test] @@ -203,7 +323,7 @@ mod tests { }]) .unwrap(); verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); assert_eq!( verified_vote_packets @@ -223,7 +343,7 @@ mod tests { }]) .unwrap(); verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); assert_eq!( verified_vote_packets @@ -245,7 +365,7 @@ mod tests { }]) .unwrap(); verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); assert_eq!( verified_vote_packets @@ -268,7 +388,7 @@ mod tests { }]) .unwrap(); verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); assert_eq!( verified_vote_packets @@ -281,7 +401,7 @@ mod tests { // No new messages, should time out assert_matches!( - verified_vote_packets.receive_and_process_vote_packets(&r, true), + verified_vote_packets.receive_and_process_vote_packets(&r, true, None), Err(Error::RecvTimeout(_)) ); } @@ -310,7 +430,7 @@ mod tests { // At most `MAX_VOTES_PER_VALIDATOR` should be stored per validator verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); assert_eq!( verified_vote_packets @@ -348,7 +468,7 @@ mod tests { // Ingest the votes into the buffer let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); // Create tracker for previously sent bank votes @@ -403,7 +523,7 @@ mod tests { // Ingest the votes into the buffer let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); // Check we get two batches, one for each validator. Each batch @@ -466,7 +586,7 @@ mod tests { .unwrap(); // Ingest the votes into the buffer verified_vote_packets - .receive_and_process_vote_packets(&r, true) + .receive_and_process_vote_packets(&r, true, None) .unwrap(); let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( my_leader_bank, @@ -476,4 +596,263 @@ mod tests { assert!(gossip_votes_iterator.next().is_some()); assert!(gossip_votes_iterator.next().is_none()); } + + #[test] + fn test_only_latest_vote_is_sent_with_feature() { + let (s, r) = unbounded(); + let vote_account_key = solana_sdk::pubkey::new_rand(); + + // Send three vote state updates that are out of order + let first_vote = VoteStateUpdate::from(vec![(2, 4), (4, 3), (6, 2), (7, 1)]); + let second_vote = VoteStateUpdate::from(vec![(2, 4), (4, 3), (11, 1)]); + let third_vote = VoteStateUpdate::from(vec![(2, 5), (4, 4), (11, 3), (12, 2), (13, 1)]); + + for vote in vec![second_vote.clone(), first_vote.clone()] { + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote: VoteTransaction::from(vote), + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + } + + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); + let mut feature_set = FeatureSet::default(); + feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0); + let feature_set = Some(Arc::new(feature_set)); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, feature_set.clone()) + .unwrap(); + + // second_vote should be kept and first_vote ignored + let slot = verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .get_latest_gossip_slot(); + assert_eq!(11, slot); + + // Now send the third_vote, it should overwrite second_vote + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote: VoteTransaction::from(third_vote.clone()), + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + + verified_vote_packets + .receive_and_process_vote_packets(&r, true, feature_set) + .unwrap(); + let slot = verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .get_latest_gossip_slot(); + assert_eq!(13, slot); + + // Now send all three, but keep the feature off + for vote in vec![second_vote, first_vote, third_vote] { + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote: VoteTransaction::from(vote), + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + } + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); + let feature_set = FeatureSet::default(); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set))) + .unwrap(); + + assert_eq!( + 3, + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .len() + ); + } + + #[test] + fn test_latest_vote_feature_upgrade() { + let (s, r) = unbounded(); + let vote_account_key = solana_sdk::pubkey::new_rand(); + + // Send incremental votes + for i in 0..100 { + let vote = VoteTransaction::from(Vote::new(vec![i], Hash::new_unique())); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + } + + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); + // Receive votes without the feature active + let feature_set = Some(Arc::new(FeatureSet::default())); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, feature_set) + .unwrap(); + assert_eq!( + 100, + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .len() + ); + + // Now send some new votes + for i in 101..201 { + let slots = std::iter::zip((i - 30)..(i + 1), (1..32).rev()) + .map(|(slot, confirmation_count)| Lockout { + slot, + confirmation_count, + }) + .into_iter() + .collect::>(); + let vote = VoteTransaction::from(VoteStateUpdate::new( + slots, + Some(i - 32), + Hash::new_unique(), + )); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + } + + // Receive votes with the feature active + let mut feature_set = FeatureSet::default(); + feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set))) + .unwrap(); + if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() { + assert_eq!(200, vote.slot); + } else { + panic!("Feature active but incremental votes are present"); + } + } + + #[test] + fn test_incremental_votes_with_feature_active() { + let (s, r) = unbounded(); + let vote_account_key = solana_sdk::pubkey::new_rand(); + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); + + let hash = Hash::new_unique(); + let vote = VoteTransaction::from(Vote::new(vec![42], hash)); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + + // Receive incremental votes with the feature active + let mut feature_set = FeatureSet::default(); + feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set))) + .unwrap(); + + // Should still store as incremental votes + if let IncrementalVotes(votes) = verified_vote_packets.0.get(&vote_account_key).unwrap() { + assert!(votes.contains_key(&(42, hash))); + } else { + panic!("Although feature is active, incremental votes should not be stored as full tower votes"); + } + } + + #[test] + fn test_latest_votes_downgrade_full_to_incremental() { + let (s, r) = unbounded(); + let vote_account_key = solana_sdk::pubkey::new_rand(); + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); + + let vote = VoteTransaction::from(VoteStateUpdate::from(vec![(42, 1)])); + let hash_42 = vote.hash(); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + + // Receive full votes + let mut feature_set = FeatureSet::default(); + feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set))) + .unwrap(); + assert_eq!( + 42, + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .get_latest_gossip_slot() + ); + + // Try to send an old ibncremental vote from pre feature activation + let vote = VoteTransaction::from(Vote::new(vec![34], Hash::new_unique())); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + + // Try to receive nothing should happen + let mut feature_set = FeatureSet::default(); + feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set))) + .unwrap(); + if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() { + assert_eq!(42, vote.slot); + } else { + panic!("Old vote triggered a downgrade conversion"); + } + + // Now try to send an incremental vote + let vote = VoteTransaction::from(Vote::new(vec![43], Hash::new_unique())); + let hash_43 = vote.hash(); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch: PacketBatch::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + + // Try to receive and vote lands as well as the conversion back to incremental votes + let mut feature_set = FeatureSet::default(); + feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0); + verified_vote_packets + .receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set))) + .unwrap(); + if let IncrementalVotes(votes) = verified_vote_packets.0.get(&vote_account_key).unwrap() { + assert!(votes.contains_key(&(42, hash_42))); + assert!(votes.contains_key(&(43, hash_43))); + assert_eq!(2, votes.len()); + } else { + panic!("Conversion back to incremental votes failed"); + } + } }