From c0c60386544ec9a9ec7119229f37386d9f070523 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 30 Dec 2021 15:03:14 +0000 Subject: [PATCH] checks for authorized voter early on in the vote-listener pipeline (#22169) Before votes are verified that they are signed by the authorized voter, they might be dropped in verified-vote-packets code. If there are enough many spam votes from unauthorized voters, this may potentially drop valid votes but keep the false ones. https://github.com/solana-labs/solana/blob/57986f982/core/src/verified_vote_packets.rs#L165-L168 --- core/src/cluster_info_vote_listener.rs | 169 +++++++++++++++---------- core/src/tpu.rs | 4 +- 2 files changed, 101 insertions(+), 72 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 18bc79986e..9691895f38 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -13,7 +13,6 @@ use { unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select, Sender as CrossbeamSender, }, - itertools::izip, log::*, solana_gossip::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, @@ -297,10 +296,10 @@ pub struct ClusterInfoVoteListener { impl ClusterInfoVoteListener { #[allow(clippy::too_many_arguments)] pub fn new( - exit: &Arc, + exit: Arc, cluster_info: Arc, verified_packets_sender: CrossbeamSender>, - poh_recorder: &Arc>, + poh_recorder: Arc>, vote_tracker: Arc, bank_forks: Arc>, subscriptions: Arc, @@ -311,25 +310,26 @@ impl ClusterInfoVoteListener { bank_notification_sender: Option, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, ) -> Self { - let exit_ = exit.clone(); - let (verified_vote_label_packets_sender, verified_vote_label_packets_receiver) = unbounded(); let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded(); - let listen_thread = Builder::new() - .name("solana-cluster_info_vote_listener".to_string()) - .spawn(move || { - let _ = Self::recv_loop( - exit_, - &cluster_info, - verified_vote_label_packets_sender, - verified_vote_transactions_sender, - ); - }) - .unwrap(); - + let listen_thread = { + let exit = exit.clone(); + let bank_forks = bank_forks.clone(); + Builder::new() + .name("solana-cluster_info_vote_listener".to_string()) + .spawn(move || { + let _ = Self::recv_loop( + exit, + &cluster_info, + &bank_forks, + verified_vote_label_packets_sender, + verified_vote_transactions_sender, + ); + }) + .unwrap() + }; let exit_ = exit.clone(); - let poh_recorder = poh_recorder.clone(); let bank_send_thread = Builder::new() .name("solana-cluster_info_bank_send".to_string()) .spawn(move || { @@ -342,12 +342,11 @@ impl ClusterInfoVoteListener { }) .unwrap(); - let exit_ = exit.clone(); let send_thread = Builder::new() .name("solana-cluster_info_process_votes".to_string()) .spawn(move || { let _ = Self::process_votes_loop( - exit_, + exit, verified_vote_transactions_receiver, vote_tracker, bank_forks, @@ -367,16 +366,14 @@ impl ClusterInfoVoteListener { } } - pub fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) + pub(crate) fn join(self) -> thread::Result<()> { + self.thread_hdls.into_iter().try_for_each(JoinHandle::join) } fn recv_loop( exit: Arc, cluster_info: &ClusterInfo, + bank_forks: &RwLock, verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender, verified_vote_transactions_sender: VerifiedVoteTransactionsSender, ) -> Result<()> { @@ -385,7 +382,7 @@ impl ClusterInfoVoteListener { let votes = cluster_info.get_votes(&mut cursor); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); if !votes.is_empty() { - let (vote_txs, packets) = Self::verify_votes(votes); + let (vote_txs, packets) = Self::verify_votes(votes, bank_forks); verified_vote_transactions_sender.send(vote_txs)?; verified_vote_label_packets_sender.send(packets)?; } @@ -395,43 +392,45 @@ impl ClusterInfoVoteListener { } #[allow(clippy::type_complexity)] - fn verify_votes(votes: Vec) -> (Vec, Vec) { + fn verify_votes( + votes: Vec, + bank_forks: &RwLock, + ) -> (Vec, Vec) { let mut packet_batches = packet::to_packet_batches(&votes, 1); // Votes should already be filtered by this point. - let reject_non_vote = false; - sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote); - - let (vote_txs, vote_metadata) = izip!(votes.into_iter(), packet_batches) - .filter_map(|(vote_tx, packet_batch)| { - let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx) - .and_then(|(vote_account_key, vote, _)| { - if vote.slots().is_empty() { - None - } else { - Some((vote, vote_account_key)) - } - })?; - + sigverify::ed25519_verify_cpu(&mut packet_batches, /*reject_non_vote=*/ false); + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch_schedule = root_bank.epoch_schedule(); + votes + .into_iter() + .zip(packet_batches) + .filter(|(_, packet_batch)| { // to_packet_batches() above splits into 1 packet long batches assert_eq!(packet_batch.packets.len(), 1); - if !packet_batch.packets[0].meta.discard { - if let Some(signature) = vote_tx.signatures.first().cloned() { - return Some(( - vote_tx, - VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch, - signature, - }, - )); - } - } - None + !packet_batch.packets[0].meta.discard }) - .unzip(); - (vote_txs, vote_metadata) + .filter_map(|(tx, packet_batch)| { + let (vote_account_key, vote, _) = vote_transaction::parse_vote_transaction(&tx)?; + let slot = vote.last_voted_slot()?; + let epoch = epoch_schedule.get_epoch(slot); + let authorized_voter = root_bank + .epoch_stakes(epoch)? + .epoch_authorized_voters() + .get(&vote_account_key)?; + let mut keys = tx.message.account_keys.iter().enumerate(); + if !keys.any(|(i, key)| tx.message.is_signer(i) && key == authorized_voter) { + return None; + } + let verified_vote_metadata = VerifiedVoteMetadata { + vote_account_key, + vote, + packet_batch, + signature: *tx.signatures.first()?, + }; + Some((tx, verified_vote_metadata)) + }) + .unzip() } fn bank_send_loop( @@ -558,7 +557,7 @@ impl ClusterInfoVoteListener { return Ok(()); } - let root_bank = bank_forks.read().unwrap().root_bank().clone(); + let root_bank = bank_forks.read().unwrap().root_bank(); if last_process_root.elapsed().as_millis() > DEFAULT_MS_PER_SLOT as u128 { let unrooted_optimistic_slots = confirmation_verifier .verify_for_unrooted_optimistic_slots(&root_bank, &blockstore); @@ -965,6 +964,7 @@ mod tests { solana_vote_program::vote_state::Vote, std::{ collections::BTreeSet, + iter::repeat_with, sync::{atomic::AtomicU64, Arc}, }, }; @@ -1817,8 +1817,11 @@ mod tests { #[test] fn test_verify_votes_empty() { solana_logger::setup(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = RwLock::new(BankForks::new(bank)); let votes = vec![]; - let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); + let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks); assert!(vote_txs.is_empty()); assert!(packets.is_empty()); } @@ -1831,25 +1834,40 @@ mod tests { assert_eq!(num_packets, ref_value); } - fn test_vote_tx(hash: Option) -> Transaction { - let node_keypair = Keypair::new(); - let vote_keypair = Keypair::new(); - let auth_voter_keypair = Keypair::new(); + fn test_vote_tx( + validator_vote_keypairs: Option<&ValidatorVoteKeypairs>, + hash: Option, + ) -> Transaction { + let other = ValidatorVoteKeypairs::new_rand(); + let validator_vote_keypair = validator_vote_keypairs.unwrap_or(&other); + // TODO authorized_voter_keypair should be different from vote-keypair + // but that is what create_genesis_... currently generates. vote_transaction::new_vote_transaction( vec![0], Hash::default(), Hash::default(), - &node_keypair, - &vote_keypair, - &auth_voter_keypair, + &validator_vote_keypair.node_keypair, + &validator_vote_keypair.vote_keypair, + &validator_vote_keypair.vote_keypair, // authorized_voter_keypair hash, ) } fn run_test_verify_votes_1_pass(hash: Option) { - let vote_tx = test_vote_tx(hash); + let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand) + .take(10) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, // mint_lamports + &voting_keypairs, + vec![100; voting_keypairs.len()], // stakes + ); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = RwLock::new(BankForks::new(bank)); + let vote_tx = test_vote_tx(voting_keypairs.first(), hash); let votes = vec![vote_tx]; - let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); + let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks); assert_eq!(vote_txs.len(), 1); verify_packets_len(&packets, 1); } @@ -1861,11 +1879,22 @@ mod tests { } fn run_test_bad_vote(hash: Option) { - let vote_tx = test_vote_tx(hash); + let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand) + .take(10) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, // mint_lamports + &voting_keypairs, + vec![100; voting_keypairs.len()], // stakes + ); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = RwLock::new(BankForks::new(bank)); + let vote_tx = test_vote_tx(voting_keypairs.first(), hash); let mut bad_vote = vote_tx.clone(); bad_vote.signatures[0] = Signature::default(); let votes = vec![vote_tx.clone(), bad_vote, vote_tx]; - let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); + let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks); assert_eq!(vote_txs.len(), 2); verify_packets_len(&packets, 2); } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d2afd1829a..f34c4e6cf3 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -110,10 +110,10 @@ impl Tpu { let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( - exit, + exit.clone(), cluster_info.clone(), verified_gossip_vote_packets_sender, - poh_recorder, + poh_recorder.clone(), vote_tracker, bank_forks.clone(), subscriptions.clone(),