checks for authorized voter early on in the vote-listener pipeline (#22169)

Before votes are verified that they are signed by the authorized voter,
they might be dropped in verified-vote-packets code. If there are
enough many spam votes from unauthorized voters, this may potentially
drop valid votes but keep the false ones.
https://github.com/solana-labs/solana/blob/57986f982/core/src/verified_vote_packets.rs#L165-L168
This commit is contained in:
behzad nouri 2021-12-30 15:03:14 +00:00 committed by GitHub
parent edb20d6909
commit c0c6038654
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 72 deletions

View File

@ -13,7 +13,6 @@ use {
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select, unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select,
Sender as CrossbeamSender, Sender as CrossbeamSender,
}, },
itertools::izip,
log::*, log::*,
solana_gossip::{ solana_gossip::{
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
@ -297,10 +296,10 @@ pub struct ClusterInfoVoteListener {
impl ClusterInfoVoteListener { impl ClusterInfoVoteListener {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
verified_packets_sender: CrossbeamSender<Vec<PacketBatch>>, verified_packets_sender: CrossbeamSender<Vec<PacketBatch>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
@ -311,25 +310,26 @@ impl ClusterInfoVoteListener {
bank_notification_sender: Option<BankNotificationSender>, bank_notification_sender: Option<BankNotificationSender>,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
) -> Self { ) -> Self {
let exit_ = exit.clone();
let (verified_vote_label_packets_sender, verified_vote_label_packets_receiver) = let (verified_vote_label_packets_sender, verified_vote_label_packets_receiver) =
unbounded(); unbounded();
let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded(); let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded();
let listen_thread = Builder::new() let listen_thread = {
.name("solana-cluster_info_vote_listener".to_string()) let exit = exit.clone();
.spawn(move || { let bank_forks = bank_forks.clone();
let _ = Self::recv_loop( Builder::new()
exit_, .name("solana-cluster_info_vote_listener".to_string())
&cluster_info, .spawn(move || {
verified_vote_label_packets_sender, let _ = Self::recv_loop(
verified_vote_transactions_sender, exit,
); &cluster_info,
}) &bank_forks,
.unwrap(); verified_vote_label_packets_sender,
verified_vote_transactions_sender,
);
})
.unwrap()
};
let exit_ = exit.clone(); let exit_ = exit.clone();
let poh_recorder = poh_recorder.clone();
let bank_send_thread = Builder::new() let bank_send_thread = Builder::new()
.name("solana-cluster_info_bank_send".to_string()) .name("solana-cluster_info_bank_send".to_string())
.spawn(move || { .spawn(move || {
@ -342,12 +342,11 @@ impl ClusterInfoVoteListener {
}) })
.unwrap(); .unwrap();
let exit_ = exit.clone();
let send_thread = Builder::new() let send_thread = Builder::new()
.name("solana-cluster_info_process_votes".to_string()) .name("solana-cluster_info_process_votes".to_string())
.spawn(move || { .spawn(move || {
let _ = Self::process_votes_loop( let _ = Self::process_votes_loop(
exit_, exit,
verified_vote_transactions_receiver, verified_vote_transactions_receiver,
vote_tracker, vote_tracker,
bank_forks, bank_forks,
@ -367,16 +366,14 @@ impl ClusterInfoVoteListener {
} }
} }
pub fn join(self) -> thread::Result<()> { pub(crate) fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls { self.thread_hdls.into_iter().try_for_each(JoinHandle::join)
thread_hdl.join()?;
}
Ok(())
} }
fn recv_loop( fn recv_loop(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
bank_forks: &RwLock<BankForks>,
verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender, verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender,
verified_vote_transactions_sender: VerifiedVoteTransactionsSender, verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
) -> Result<()> { ) -> Result<()> {
@ -385,7 +382,7 @@ impl ClusterInfoVoteListener {
let votes = cluster_info.get_votes(&mut cursor); let votes = cluster_info.get_votes(&mut cursor);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
if !votes.is_empty() { if !votes.is_empty() {
let (vote_txs, packets) = Self::verify_votes(votes); let (vote_txs, packets) = Self::verify_votes(votes, bank_forks);
verified_vote_transactions_sender.send(vote_txs)?; verified_vote_transactions_sender.send(vote_txs)?;
verified_vote_label_packets_sender.send(packets)?; verified_vote_label_packets_sender.send(packets)?;
} }
@ -395,43 +392,45 @@ impl ClusterInfoVoteListener {
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn verify_votes(votes: Vec<Transaction>) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) { fn verify_votes(
votes: Vec<Transaction>,
bank_forks: &RwLock<BankForks>,
) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) {
let mut packet_batches = packet::to_packet_batches(&votes, 1); let mut packet_batches = packet::to_packet_batches(&votes, 1);
// Votes should already be filtered by this point. // Votes should already be filtered by this point.
let reject_non_vote = false; sigverify::ed25519_verify_cpu(&mut packet_batches, /*reject_non_vote=*/ false);
sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote); let root_bank = bank_forks.read().unwrap().root_bank();
let epoch_schedule = root_bank.epoch_schedule();
let (vote_txs, vote_metadata) = izip!(votes.into_iter(), packet_batches) votes
.filter_map(|(vote_tx, packet_batch)| { .into_iter()
let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx) .zip(packet_batches)
.and_then(|(vote_account_key, vote, _)| { .filter(|(_, packet_batch)| {
if vote.slots().is_empty() {
None
} else {
Some((vote, vote_account_key))
}
})?;
// to_packet_batches() above splits into 1 packet long batches // to_packet_batches() above splits into 1 packet long batches
assert_eq!(packet_batch.packets.len(), 1); assert_eq!(packet_batch.packets.len(), 1);
if !packet_batch.packets[0].meta.discard { !packet_batch.packets[0].meta.discard
if let Some(signature) = vote_tx.signatures.first().cloned() {
return Some((
vote_tx,
VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature,
},
));
}
}
None
}) })
.unzip(); .filter_map(|(tx, packet_batch)| {
(vote_txs, vote_metadata) let (vote_account_key, vote, _) = vote_transaction::parse_vote_transaction(&tx)?;
let slot = vote.last_voted_slot()?;
let epoch = epoch_schedule.get_epoch(slot);
let authorized_voter = root_bank
.epoch_stakes(epoch)?
.epoch_authorized_voters()
.get(&vote_account_key)?;
let mut keys = tx.message.account_keys.iter().enumerate();
if !keys.any(|(i, key)| tx.message.is_signer(i) && key == authorized_voter) {
return None;
}
let verified_vote_metadata = VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature: *tx.signatures.first()?,
};
Some((tx, verified_vote_metadata))
})
.unzip()
} }
fn bank_send_loop( fn bank_send_loop(
@ -558,7 +557,7 @@ impl ClusterInfoVoteListener {
return Ok(()); return Ok(());
} }
let root_bank = bank_forks.read().unwrap().root_bank().clone(); let root_bank = bank_forks.read().unwrap().root_bank();
if last_process_root.elapsed().as_millis() > DEFAULT_MS_PER_SLOT as u128 { if last_process_root.elapsed().as_millis() > DEFAULT_MS_PER_SLOT as u128 {
let unrooted_optimistic_slots = confirmation_verifier let unrooted_optimistic_slots = confirmation_verifier
.verify_for_unrooted_optimistic_slots(&root_bank, &blockstore); .verify_for_unrooted_optimistic_slots(&root_bank, &blockstore);
@ -965,6 +964,7 @@ mod tests {
solana_vote_program::vote_state::Vote, solana_vote_program::vote_state::Vote,
std::{ std::{
collections::BTreeSet, collections::BTreeSet,
iter::repeat_with,
sync::{atomic::AtomicU64, Arc}, sync::{atomic::AtomicU64, Arc},
}, },
}; };
@ -1817,8 +1817,11 @@ mod tests {
#[test] #[test]
fn test_verify_votes_empty() { fn test_verify_votes_empty() {
solana_logger::setup(); solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank));
let votes = vec![]; let votes = vec![];
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
assert!(vote_txs.is_empty()); assert!(vote_txs.is_empty());
assert!(packets.is_empty()); assert!(packets.is_empty());
} }
@ -1831,25 +1834,40 @@ mod tests {
assert_eq!(num_packets, ref_value); assert_eq!(num_packets, ref_value);
} }
fn test_vote_tx(hash: Option<Hash>) -> Transaction { fn test_vote_tx(
let node_keypair = Keypair::new(); validator_vote_keypairs: Option<&ValidatorVoteKeypairs>,
let vote_keypair = Keypair::new(); hash: Option<Hash>,
let auth_voter_keypair = Keypair::new(); ) -> Transaction {
let other = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypair = validator_vote_keypairs.unwrap_or(&other);
// TODO authorized_voter_keypair should be different from vote-keypair
// but that is what create_genesis_... currently generates.
vote_transaction::new_vote_transaction( vote_transaction::new_vote_transaction(
vec![0], vec![0],
Hash::default(), Hash::default(),
Hash::default(), Hash::default(),
&node_keypair, &validator_vote_keypair.node_keypair,
&vote_keypair, &validator_vote_keypair.vote_keypair,
&auth_voter_keypair, &validator_vote_keypair.vote_keypair, // authorized_voter_keypair
hash, hash,
) )
} }
fn run_test_verify_votes_1_pass(hash: Option<Hash>) { fn run_test_verify_votes_1_pass(hash: Option<Hash>) {
let vote_tx = test_vote_tx(hash); let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand)
.take(10)
.collect();
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
10_000, // mint_lamports
&voting_keypairs,
vec![100; voting_keypairs.len()], // stakes
);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank));
let vote_tx = test_vote_tx(voting_keypairs.first(), hash);
let votes = vec![vote_tx]; let votes = vec![vote_tx];
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
assert_eq!(vote_txs.len(), 1); assert_eq!(vote_txs.len(), 1);
verify_packets_len(&packets, 1); verify_packets_len(&packets, 1);
} }
@ -1861,11 +1879,22 @@ mod tests {
} }
fn run_test_bad_vote(hash: Option<Hash>) { fn run_test_bad_vote(hash: Option<Hash>) {
let vote_tx = test_vote_tx(hash); let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand)
.take(10)
.collect();
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
10_000, // mint_lamports
&voting_keypairs,
vec![100; voting_keypairs.len()], // stakes
);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank));
let vote_tx = test_vote_tx(voting_keypairs.first(), hash);
let mut bad_vote = vote_tx.clone(); let mut bad_vote = vote_tx.clone();
bad_vote.signatures[0] = Signature::default(); bad_vote.signatures[0] = Signature::default();
let votes = vec![vote_tx.clone(), bad_vote, vote_tx]; let votes = vec![vote_tx.clone(), bad_vote, vote_tx];
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
assert_eq!(vote_txs.len(), 2); assert_eq!(vote_txs.len(), 2);
verify_packets_len(&packets, 2); verify_packets_len(&packets, 2);
} }

View File

@ -110,10 +110,10 @@ impl Tpu {
let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) =
unbounded(); unbounded();
let cluster_info_vote_listener = ClusterInfoVoteListener::new( let cluster_info_vote_listener = ClusterInfoVoteListener::new(
exit, exit.clone(),
cluster_info.clone(), cluster_info.clone(),
verified_gossip_vote_packets_sender, verified_gossip_vote_packets_sender,
poh_recorder, poh_recorder.clone(),
vote_tracker, vote_tracker,
bank_forks.clone(), bank_forks.clone(),
subscriptions.clone(), subscriptions.clone(),