From b30c94ce55d0aa1369bf99fd79cb247329fe7ecd Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 18 Nov 2021 15:20:41 -0800 Subject: [PATCH] ClusterInfoVoteListener send only missing votes to BankingStage (#20873) --- ci/buildkite-pipeline.sh | 2 +- core/src/cluster_info_vote_listener.rs | 287 ++++++++++--- core/src/replay_stage.rs | 12 +- core/src/verified_vote_packets.rs | 561 +++++++++++++++++++------ gossip/src/cluster_info.rs | 42 +- gossip/tests/gossip.rs | 1 - local-cluster/tests/local_cluster.rs | 120 +++++- runtime/src/bank.rs | 19 - sdk/program/src/slot_hashes.rs | 3 + sdk/src/signature.rs | 5 + 10 files changed, 813 insertions(+), 239 deletions(-) diff --git a/ci/buildkite-pipeline.sh b/ci/buildkite-pipeline.sh index 923b4fbc92..c289df3354 100755 --- a/ci/buildkite-pipeline.sh +++ b/ci/buildkite-pipeline.sh @@ -243,7 +243,7 @@ EOF command_step "local-cluster" \ ". ci/rust-version.sh; ci/docker-run.sh \$\$rust_stable_docker_image ci/test-local-cluster.sh" \ - 45 + 50 } pull_or_push_steps() { diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 3c1b4d9708..c528c39951 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -3,7 +3,9 @@ use crate::{ replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, sigverify, - verified_vote_packets::VerifiedVotePackets, + verified_vote_packets::{ + ValidatorGossipVotesIterator, VerifiedVoteMetadata, VerifiedVotePackets, + }, vote_stake_tracker::VoteStakeTracker, }; use crossbeam_channel::{ @@ -14,9 +16,9 @@ use log::*; use solana_gossip::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, crds::Cursor, - crds_value::CrdsValueLabel, }; use solana_ledger::blockstore::Blockstore; +use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{self, Packets}; use solana_poh::poh_recorder::PohRecorder; @@ -36,11 +38,13 @@ use solana_sdk::{ epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, + signature::Signature, + slot_hashes, transaction::Transaction, }; use solana_vote_program::{self, vote_state::Vote, vote_transaction}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, sync::{ atomic::{AtomicBool, Ordering}, {Arc, Mutex, RwLock}, @@ -52,8 +56,8 @@ use std::{ // Map from a vote account to the authorized voter for an epoch pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>; pub type VotedHashUpdates = HashMap>; -pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; -pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; +pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; +pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; pub type VerifiedVoteTransactionsSender = CrossbeamSender>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec)>; @@ -64,6 +68,7 @@ pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender; const THRESHOLDS_TO_CHECK: [f64; 2] = [DUPLICATE_THRESHOLD, VOTE_THRESHOLD_SIZE]; +const BANK_SEND_VOTES_LOOP_SLEEP_MS: u128 = 10; #[derive(Default)] pub struct SlotVoteTracker { @@ -240,6 +245,45 @@ impl VoteTracker { } } +struct BankVoteSenderState { + bank: Arc, + previously_sent_to_bank_votes: HashSet, + bank_send_votes_stats: BankSendVotesStats, +} + +impl BankVoteSenderState { + fn new(bank: Arc) -> Self { + Self { + bank, + previously_sent_to_bank_votes: HashSet::new(), + bank_send_votes_stats: BankSendVotesStats::default(), + } + } + + fn report_metrics(&self) { + self.bank_send_votes_stats.report_metrics(self.bank.slot()); + } +} + +#[derive(Default)] +struct BankSendVotesStats { + num_votes_sent: usize, + num_batches_sent: usize, + total_elapsed: u64, +} + +impl BankSendVotesStats { + fn report_metrics(&self, slot: Slot) { + datapoint_info!( + "cluster_info_vote_listener-bank-send-vote-stats", + ("slot", slot, i64), + ("num_votes_sent", self.num_votes_sent, i64), + ("total_elapsed", self.total_elapsed, i64), + ("num_batches_sent", self.num_batches_sent, i64), + ); + } +} + pub struct ClusterInfoVoteListener { thread_hdls: Vec>, } @@ -332,10 +376,10 @@ impl ClusterInfoVoteListener { ) -> Result<()> { let mut cursor = Cursor::default(); while !exit.load(Ordering::Relaxed) { - let (labels, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); if !votes.is_empty() { - let (vote_txs, packets) = Self::verify_votes(votes, labels); + let (vote_txs, packets) = Self::verify_votes(votes); verified_vote_transactions_sender.send(vote_txs)?; verified_vote_label_packets_sender.send(packets)?; } @@ -345,31 +389,43 @@ impl ClusterInfoVoteListener { } #[allow(clippy::type_complexity)] - fn verify_votes( - votes: Vec, - labels: Vec, - ) -> (Vec, Vec<(CrdsValueLabel, Slot, Packets)>) { + fn verify_votes(votes: Vec) -> (Vec, Vec) { let mut msgs = packet::to_packets_chunked(&votes, 1); // Votes should already be filtered by this point. let reject_non_vote = false; sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote); - let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,) - .filter_map(|(label, vote, packet)| { - let slot = vote_transaction::parse_vote_transaction(&vote) - .and_then(|(_, vote, _)| vote.slots.last().copied())?; + let (vote_txs, vote_metadata) = izip!(votes.into_iter(), msgs,) + .filter_map(|(vote_tx, packet)| { + let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx) + .and_then(|(vote_account_key, vote, _)| { + if vote.slots.is_empty() { + None + } else { + Some((vote, vote_account_key)) + } + })?; // to_packets_chunked() above split into 1 packet long chunks assert_eq!(packet.packets.len(), 1); if !packet.packets[0].meta.discard { - Some((vote, (label, slot, packet))) - } else { - None + if let Some(signature) = vote_tx.signatures.first().cloned() { + return Some(( + vote_tx, + VerifiedVoteMetadata { + vote_account_key, + vote, + packet, + signature, + }, + )); + } } + None }) .unzip(); - (vote_txs, packets) + (vote_txs, vote_metadata) } fn bank_send_loop( @@ -380,7 +436,8 @@ impl ClusterInfoVoteListener { ) -> Result<()> { let mut verified_vote_packets = VerifiedVotePackets::default(); let mut time_since_lock = Instant::now(); - let mut update_version = 0; + let mut bank_vote_sender_state_option: Option = None; + loop { if exit.load(Ordering::Relaxed) { return Ok(()); @@ -389,43 +446,89 @@ impl ClusterInfoVoteListener { let would_be_leader = poh_recorder .lock() .unwrap() - .would_be_leader(20 * DEFAULT_TICKS_PER_SLOT); + .would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT); + if let Err(e) = verified_vote_packets.receive_and_process_vote_packets( &verified_vote_label_packets_receiver, - &mut update_version, would_be_leader, ) { match e { - Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) => { - return Ok(()); - } - Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout) => (), + Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) + | Error::ReadyTimeout => (), _ => { error!("thread {:?} error {:?}", thread::current().name(), e); } } } - if time_since_lock.elapsed().as_millis() > GOSSIP_SLEEP_MILLIS as u128 { - let bank = poh_recorder.lock().unwrap().bank(); - if let Some(bank) = bank { - let last_version = bank.last_vote_sync.load(Ordering::Relaxed); - let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version); - inc_new_counter_info!("bank_send_loop_batch_size", msgs.packets.len()); - inc_new_counter_info!("bank_send_loop_num_batches", 1); - verified_packets_sender.send(vec![msgs])?; - #[allow(deprecated)] - bank.last_vote_sync.compare_and_swap( - last_version, - new_version, - Ordering::Relaxed, - ); - time_since_lock = Instant::now(); + if time_since_lock.elapsed().as_millis() > BANK_SEND_VOTES_LOOP_SLEEP_MS as u128 { + // Always set this to avoid taking the poh lock too often + time_since_lock = Instant::now(); + // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` + if let Some(current_working_bank) = poh_recorder.lock().unwrap().bank() { + Self::check_for_leader_bank_and_send_votes( + &mut bank_vote_sender_state_option, + current_working_bank, + verified_packets_sender, + &verified_vote_packets, + )?; } } } } + fn check_for_leader_bank_and_send_votes( + bank_vote_sender_state_option: &mut Option, + current_working_bank: Arc, + verified_packets_sender: &CrossbeamSender>, + verified_vote_packets: &VerifiedVotePackets, + ) -> Result<()> { + // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` + if let Some(bank_vote_sender_state) = bank_vote_sender_state_option { + if bank_vote_sender_state.bank.slot() != current_working_bank.slot() { + bank_vote_sender_state.report_metrics(); + *bank_vote_sender_state_option = + Some(BankVoteSenderState::new(current_working_bank)); + } + } else { + *bank_vote_sender_state_option = Some(BankVoteSenderState::new(current_working_bank)); + } + + let bank_vote_sender_state = bank_vote_sender_state_option.as_mut().unwrap(); + let BankVoteSenderState { + ref bank, + ref mut bank_send_votes_stats, + ref mut previously_sent_to_bank_votes, + } = bank_vote_sender_state; + + // This logic may run multiple times for the same leader bank, + // we just have to ensure that the same votes are not sent + // to the bank multiple times, which is guaranteed by + // `previously_sent_to_bank_votes` + let gossip_votes_iterator = ValidatorGossipVotesIterator::new( + bank.clone(), + verified_vote_packets, + previously_sent_to_bank_votes, + ); + + let mut filter_gossip_votes_timing = Measure::start("filter_gossip_votes"); + + // Send entire batch at a time so that there is no partial processing of + // a single validator's votes by two different banks. This might happen + // if we sent each vote individually, for instance if we creaed two different + // leader banks from the same common parent, one leader bank may process + // only the later votes and ignore the earlier votes. + for single_validator_votes in gossip_votes_iterator { + bank_send_votes_stats.num_votes_sent += single_validator_votes.len(); + bank_send_votes_stats.num_batches_sent += 1; + verified_packets_sender.send(single_validator_votes)?; + } + filter_gossip_votes_timing.stop(); + bank_send_votes_stats.total_elapsed += filter_gossip_votes_timing.as_us(); + + Ok(()) + } + #[allow(clippy::too_many_arguments)] fn process_votes_loop( exit: Arc, @@ -481,8 +584,10 @@ impl ClusterInfoVoteListener { .add_new_optimistic_confirmed_slots(confirmed_slots.clone()); } Err(e) => match e { - Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout) - | Error::ReadyTimeout => (), + Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) => { + return Ok(()); + } + Error::ReadyTimeout => (), _ => { error!("thread {:?} error {:?}", thread::current().name(), e); } @@ -840,15 +945,17 @@ mod tests { use solana_runtime::{ bank::Bank, commitment::BlockCommitmentCache, - genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, + genesis_utils::{self, create_genesis_config, GenesisConfigInfo, ValidatorVoteKeypairs}, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ hash::Hash, + pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; use solana_vote_program::vote_state::Vote; use std::collections::BTreeSet; + use std::sync::Arc; #[test] fn test_max_vote_tx_fits() { @@ -1694,14 +1801,16 @@ mod tests { fn test_verify_votes_empty() { solana_logger::setup(); let votes = vec![]; - let labels = vec![]; - let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, labels); + let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); assert!(vote_txs.is_empty()); assert!(packets.is_empty()); } - fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, Packets)], ref_value: usize) { - let num_packets: usize = packets.iter().map(|(_, _, p)| p.packets.len()).sum(); + fn verify_packets_len(packets: &[VerifiedVoteMetadata], ref_value: usize) { + let num_packets: usize = packets + .iter() + .map(|vote_metadata| vote_metadata.packet.packets.len()) + .sum(); assert_eq!(num_packets, ref_value); } @@ -1723,8 +1832,7 @@ mod tests { fn run_test_verify_votes_1_pass(hash: Option) { let vote_tx = test_vote_tx(hash); let votes = vec![vote_tx]; - let labels = vec![CrdsValueLabel::Vote(0, solana_sdk::pubkey::new_rand())]; - let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, labels); + let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); assert_eq!(vote_txs.len(), 1); verify_packets_len(&packets, 1); } @@ -1740,9 +1848,7 @@ mod tests { let mut bad_vote = vote_tx.clone(); bad_vote.signatures[0] = Signature::default(); let votes = vec![vote_tx.clone(), bad_vote, vote_tx]; - let label = CrdsValueLabel::Vote(0, solana_sdk::pubkey::new_rand()); - let labels: Vec<_> = (0..votes.len()).map(|_| label.clone()).collect(); - let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, labels); + let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes); assert_eq!(vote_txs.len(), 2); verify_packets_len(&packets, 2); } @@ -1767,4 +1873,79 @@ mod tests { run_test_bad_vote(None); run_test_bad_vote(Some(Hash::default())); } + + #[test] + fn test_check_for_leader_bank_and_send_votes() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(1000); + let current_leader_bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let mut bank_vote_sender_state_option: Option = None; + let verified_vote_packets = VerifiedVotePackets::default(); + let (verified_packets_sender, _verified_packets_receiver) = unbounded(); + + // 1) If we hand over a `current_leader_bank`, vote sender state should be updated + ClusterInfoVoteListener::check_for_leader_bank_and_send_votes( + &mut bank_vote_sender_state_option, + current_leader_bank.clone(), + &verified_packets_sender, + &verified_vote_packets, + ) + .unwrap(); + + assert_eq!( + bank_vote_sender_state_option.as_ref().unwrap().bank.slot(), + current_leader_bank.slot() + ); + bank_vote_sender_state_option + .as_mut() + .unwrap() + .previously_sent_to_bank_votes + .insert(Signature::new_unique()); + + // 2) Handing over the same leader bank again should not update the state + ClusterInfoVoteListener::check_for_leader_bank_and_send_votes( + &mut bank_vote_sender_state_option, + current_leader_bank.clone(), + &verified_packets_sender, + &verified_vote_packets, + ) + .unwrap(); + // If we hand over a `current_leader_bank`, vote sender state should be updated + assert_eq!( + bank_vote_sender_state_option.as_ref().unwrap().bank.slot(), + current_leader_bank.slot() + ); + assert_eq!( + bank_vote_sender_state_option + .as_ref() + .unwrap() + .previously_sent_to_bank_votes + .len(), + 1 + ); + + let current_leader_bank = Arc::new(Bank::new_from_parent( + ¤t_leader_bank, + &Pubkey::default(), + current_leader_bank.slot() + 1, + )); + ClusterInfoVoteListener::check_for_leader_bank_and_send_votes( + &mut bank_vote_sender_state_option, + current_leader_bank.clone(), + &verified_packets_sender, + &verified_vote_packets, + ) + .unwrap(); + + // 3) If we hand over a new `current_leader_bank`, vote sender state should be updated + // to the new bank + assert_eq!( + bank_vote_sender_state_option.as_ref().unwrap().bank.slot(), + current_leader_bank.slot() + ); + assert!(bank_vote_sender_state_option + .as_ref() + .unwrap() + .previously_sent_to_bank_votes + .is_empty()); + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 420a710372..62684d008b 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -5642,7 +5642,7 @@ pub mod tests { ); let mut cursor = Cursor::default(); - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; assert_eq!(vote_tx.message.recent_blockhash, bank0.last_blockhash()); @@ -5671,7 +5671,7 @@ pub mod tests { ); // No new votes have been submitted to gossip - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); // Tower's latest vote tx blockhash hasn't changed either assert_eq!(tower.last_vote_tx_blockhash(), bank0.last_blockhash()); @@ -5704,7 +5704,7 @@ pub mod tests { vote_info, false, ); - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; assert_eq!(vote_tx.message.recent_blockhash, bank1.last_blockhash()); @@ -5727,7 +5727,7 @@ pub mod tests { ); // No new votes have been submitted to gossip - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); assert_eq!(tower.last_vote_tx_blockhash(), bank1.last_blockhash()); assert_eq!(tower.last_voted_slot().unwrap(), 1); @@ -5774,7 +5774,7 @@ pub mod tests { ); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; assert_eq!( @@ -5830,7 +5830,7 @@ pub mod tests { &voting_sender, ); - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert!(votes.is_empty()); assert_eq!( vote_tx.message.recent_blockhash, diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 772aac1775..7df6997620 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -1,195 +1,476 @@ use crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result}; -use solana_gossip::crds_value::CrdsValueLabel; +use crossbeam_channel::Select; use solana_perf::packet::Packets; -use solana_sdk::clock::Slot; +use solana_runtime::bank::Bank; +use solana_sdk::{ + account::from_account, clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature, + slot_hashes::SlotHashes, sysvar, +}; +use solana_vote_program::vote_state::Vote; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, + sync::Arc, time::Duration, }; +const MAX_VOTES_PER_VALIDATOR: usize = 1000; + +pub struct VerifiedVoteMetadata { + pub vote_account_key: Pubkey, + pub vote: Vote, + pub packet: Packets, + pub signature: Signature, +} + +pub struct ValidatorGossipVotesIterator<'a> { + my_leader_bank: Arc, + slot_hashes: SlotHashes, + verified_vote_packets: &'a VerifiedVotePackets, + vote_account_keys: Vec, + previously_sent_to_bank_votes: &'a mut HashSet, +} + +impl<'a> ValidatorGossipVotesIterator<'a> { + pub fn new( + my_leader_bank: Arc, + verified_vote_packets: &'a VerifiedVotePackets, + previously_sent_to_bank_votes: &'a mut HashSet, + ) -> Self { + let slot_hashes_account = my_leader_bank.get_account(&sysvar::slot_hashes::id()); + + if slot_hashes_account.is_none() { + warn!( + "Slot hashes sysvar doesn't exist on bank {}", + my_leader_bank.slot() + ); + } + + let slot_hashes_account = slot_hashes_account.unwrap_or_default(); + let slot_hashes = from_account::(&slot_hashes_account).unwrap_or_default(); + + // TODO: my_leader_bank.vote_accounts() may not contain zero-staked validators + // in this epoch, but those validators may have stake warming up in the next epoch + let vote_account_keys: Vec = + my_leader_bank.vote_accounts().keys().copied().collect(); + + Self { + my_leader_bank, + slot_hashes, + verified_vote_packets, + vote_account_keys, + previously_sent_to_bank_votes, + } + } +} + +/// Each iteration returns all of the missing votes for a single validator, the votes +/// ordered from smallest to largest. +/// +/// Iterator is done after iterating through all vote accounts +impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { + type Item = Vec; + + fn next(&mut self) -> Option { + // TODO: Maybe prioritize by stake weight + while !self.vote_account_keys.is_empty() { + let vote_account_key = self.vote_account_keys.pop().unwrap(); + // Get all the gossip votes we've queued up for this validator + // that are: + // 1) missing from the current leader bank + // 2) on the same fork + let validator_votes = self + .verified_vote_packets + .0 + .get(&vote_account_key) + .and_then(|validator_gossip_votes| { + // Fetch the validator's vote state from the bank + self.my_leader_bank + .vote_accounts() + .get(&vote_account_key) + .and_then(|(_stake, vote_account)| { + vote_account.vote_state().as_ref().ok().map(|vote_state| { + let start_vote_slot = + vote_state.last_voted_slot().map(|x| x + 1).unwrap_or(0); + // Filter out the votes that are outdated + validator_gossip_votes + .range((start_vote_slot, Hash::default())..) + .filter_map(|((slot, hash), (packet, tx_signature))| { + if self.previously_sent_to_bank_votes.contains(tx_signature) + { + return None; + } + // Don't send the same vote to the same bank multiple times + self.previously_sent_to_bank_votes.insert(*tx_signature); + // Filter out votes on the wrong fork (or too old to be) + // on this fork + if self + .slot_hashes + .get(slot) + .map(|found_hash| found_hash == hash) + .unwrap_or(false) + { + Some(packet.clone()) + } else { + None + } + }) + .collect::>() + }) + }) + }); + if let Some(validator_votes) = validator_votes { + if !validator_votes.is_empty() { + return Some(validator_votes); + } + } + } + None + } +} + +pub type SingleValidatorVotes = BTreeMap<(Slot, Hash), (Packets, Signature)>; + #[derive(Default)] -pub struct VerifiedVotePackets(HashMap); +pub struct VerifiedVotePackets(HashMap); impl VerifiedVotePackets { pub fn receive_and_process_vote_packets( &mut self, vote_packets_receiver: &VerifiedLabelVotePacketsReceiver, - last_update_version: &mut u64, would_be_leader: bool, ) -> Result<()> { - let timer = Duration::from_millis(200); - let vote_packets = vote_packets_receiver.recv_timeout(timer)?; - *last_update_version += 1; - if would_be_leader { - for (label, slot, packet) in vote_packets { - self.0.insert(label, (*last_update_version, slot, packet)); - } - } else { - self.0.clear(); - self.0.shrink_to_fit(); - } - while let Ok(vote_packets) = vote_packets_receiver.try_recv() { + let mut sel = Select::new(); + sel.recv(vote_packets_receiver); + let _ = sel.ready_timeout(Duration::from_millis(200))?; + for gossip_votes in vote_packets_receiver.try_iter() { if would_be_leader { - for (label, slot, packet) in vote_packets { - self.0.insert(label, (*last_update_version, slot, packet)); - } - } - } - Ok(()) - } + for verfied_vote_metadata in gossip_votes { + let VerifiedVoteMetadata { + vote_account_key, + vote, + packet, + signature, + } = verfied_vote_metadata; + if vote.slots.is_empty() { + error!("Empty votes should have been filtered out earlier in the pipeline"); + continue; + } + let slot = vote.slots.last().unwrap(); + let hash = vote.hash; - #[cfg(test)] - fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, Packets)> { - self.0.get(key) - } + let validator_votes = self.0.entry(vote_account_key).or_default(); + validator_votes.insert((*slot, hash), (packet, signature)); - pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Packets) { - let mut new_update_version = last_update_version; - let mut votes = HashMap::new(); - for (label, (version, slot, packets)) in &self.0 { - new_update_version = std::cmp::max(*version, new_update_version); - if *version <= last_update_version { - continue; - } - match votes.entry(label.pubkey()) { - Entry::Vacant(entry) => { - entry.insert((slot, packets)); - } - Entry::Occupied(mut entry) => { - let (entry_slot, _) = entry.get(); - if *entry_slot < slot { - *entry.get_mut() = (slot, packets); + if validator_votes.len() > MAX_VOTES_PER_VALIDATOR { + let smallest_key = validator_votes.keys().next().cloned().unwrap(); + validator_votes.remove(&smallest_key).unwrap(); } } } } - let packets = votes - .into_iter() - .flat_map(|(_, (_, packets))| &packets.packets) - .cloned() - .collect(); - (new_update_version, Packets::new(packets)) + Ok(()) } } #[cfg(test)] mod tests { use super::*; - use crate::result::Error; - use crossbeam_channel::{unbounded, RecvTimeoutError}; - use solana_perf::packet::{Meta, Packet}; + use crate::{result::Error, vote_simulator::VoteSimulator}; + use crossbeam_channel::unbounded; + use solana_perf::packet::Packet; + use solana_sdk::slot_hashes::MAX_ENTRIES; #[test] - fn test_get_latest_votes() { - let pubkey = solana_sdk::pubkey::new_rand(); - let label1 = CrdsValueLabel::Vote(0, pubkey); - let label2 = CrdsValueLabel::Vote(1, pubkey); + fn test_verified_vote_packets_receive_and_process_vote_packets() { + let (s, r) = unbounded(); + let vote_account_key = solana_sdk::pubkey::new_rand(); + + // Construct the buffer let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - let data = Packet { - meta: Meta { - repair: true, - ..Meta::default() - }, - ..Packet::default() - }; - - let none_empty_packets = Packets::new(vec![data, Packet::default()]); - + // Send a vote from `vote_account_key`, check that it was inserted + let vote_slot = 0; + let vote_hash = Hash::new_unique(); + let vote = Vote::new(vec![vote_slot], vote_hash); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote: vote.clone(), + packet: Packets::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); verified_vote_packets - .0 - .insert(label1, (2, 42, none_empty_packets)); + .receive_and_process_vote_packets(&r, true) + .unwrap(); + assert_eq!( + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .len(), + 1 + ); + + // Same slot, same hash, should not be inserted + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet: Packets::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); verified_vote_packets - .0 - .insert(label2, (1, 23, Packets::default())); + .receive_and_process_vote_packets(&r, true) + .unwrap(); + assert_eq!( + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .len(), + 1 + ); - // Both updates have timestamps greater than 0, so both should be returned - let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0); - assert_eq!(new_update_version, 2); - assert_eq!(updates.packets.len(), 2); + // Same slot, different hash, should still be inserted + let new_vote_hash = Hash::new_unique(); + let vote = Vote::new(vec![vote_slot], new_vote_hash); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet: Packets::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + verified_vote_packets + .receive_and_process_vote_packets(&r, true) + .unwrap(); + assert_eq!( + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .len(), + 2 + ); - // Only the nonempty packet had a timestamp greater than 1 - let (new_update_version, updates) = verified_vote_packets.get_latest_votes(1); - assert_eq!(new_update_version, 2); - assert!(!updates.packets.is_empty()); + // Different vote slot, should be inserted + let vote_slot = 1; + let vote_hash = Hash::new_unique(); + let vote = Vote::new(vec![vote_slot], vote_hash); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet: Packets::default(), + signature: Signature::new(&[2u8; 64]), + }]) + .unwrap(); + verified_vote_packets + .receive_and_process_vote_packets(&r, true) + .unwrap(); + assert_eq!( + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .len(), + 3 + ); - // If the given timestamp is greater than all timestamps in any update, - // returned timestamp should be the same as the given timestamp, and - // no updates should be returned - let (new_update_version, updates) = verified_vote_packets.get_latest_votes(3); - assert_eq!(new_update_version, 3); - assert!(updates.is_empty()); + // No new messages, should time out + assert_matches!( + verified_vote_packets.receive_and_process_vote_packets(&r, true), + Err(Error::ReadyTimeout) + ); } #[test] - fn test_get_and_process_vote_packets() { + fn test_verified_vote_packets_receive_and_process_vote_packets_max_len() { let (s, r) = unbounded(); - let pubkey = solana_sdk::pubkey::new_rand(); - let label1 = CrdsValueLabel::Vote(0, pubkey); - let label2 = CrdsValueLabel::Vote(1, pubkey); - let mut update_version = 0; - s.send(vec![(label1.clone(), 17, Packets::default())]) - .unwrap(); - s.send(vec![(label2.clone(), 23, Packets::default())]) - .unwrap(); + let vote_account_key = solana_sdk::pubkey::new_rand(); - let data = Packet { - meta: Meta { - repair: true, - ..Meta::default() - }, - ..Packet::default() - }; + // Construct the buffer + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - let later_packets = Packets::new(vec![data, Packet::default()]); - s.send(vec![(label1.clone(), 42, later_packets)]).unwrap(); + // Send many more votes than the upper limit per validator + for _ in 0..2 * MAX_VOTES_PER_VALIDATOR { + let vote_slot = 0; + let vote_hash = Hash::new_unique(); + let vote = Vote::new(vec![vote_slot], vote_hash); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet: Packets::default(), + signature: Signature::new(&[1u8; 64]), + }]) + .unwrap(); + } + + // At most `MAX_VOTES_PER_VALIDATOR` should be stored per validator + verified_vote_packets + .receive_and_process_vote_packets(&r, true) + .unwrap(); + assert_eq!( + verified_vote_packets + .0 + .get(&vote_account_key) + .unwrap() + .len(), + MAX_VOTES_PER_VALIDATOR + ); + } + + #[test] + fn test_verified_vote_packets_validator_gossip_votes_iterator_wrong_fork() { + let (s, r) = unbounded(); + let vote_simulator = VoteSimulator::new(1); + let my_leader_bank = vote_simulator.bank_forks.read().unwrap().root_bank(); + let vote_account_key = vote_simulator.vote_pubkeys[0]; + + // Create a bunch of votes with random vote hashes, which should all be ignored + // since they are not on the same fork as `my_leader_bank`, i.e. their hashes do + // not exist in the SlotHashes sysvar for `my_leader_bank` + for _ in 0..MAX_VOTES_PER_VALIDATOR { + let vote_slot = 0; + let vote_hash = Hash::new_unique(); + let vote = Vote::new(vec![vote_slot], vote_hash); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet: Packets::default(), + signature: Signature::new_unique(), + }]) + .unwrap(); + } + + // Ingest the votes into the buffer let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); verified_vote_packets - .receive_and_process_vote_packets(&r, &mut update_version, true) + .receive_and_process_vote_packets(&r, true) .unwrap(); - // Test timestamps for same batch are the same - let update_version1 = verified_vote_packets.get_vote_packets(&label1).unwrap().0; - assert_eq!( - update_version1, - verified_vote_packets.get_vote_packets(&label2).unwrap().0 + // Create tracker for previously sent bank votes + let mut previously_sent_to_bank_votes = HashSet::new(); + let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( + my_leader_bank, + &verified_vote_packets, + &mut previously_sent_to_bank_votes, ); - // Test the later value overwrote the earlier one for this label - assert!( - verified_vote_packets - .get_vote_packets(&label1) - .unwrap() - .2 - .packets - .len() - > 1 - ); - assert_eq!( - verified_vote_packets - .get_vote_packets(&label2) - .unwrap() - .2 - .packets - .len(), - 0 - ); + // Wrong fork, we should get no hashes + assert!(gossip_votes_iterator.next().is_none()); + } - // Test timestamp for next batch overwrites the original - s.send(vec![(label2.clone(), 51, Packets::default())]) - .unwrap(); + #[test] + fn test_verified_vote_packets_validator_gossip_votes_iterator_correct_fork() { + let (s, r) = unbounded(); + let num_validators = 2; + let vote_simulator = VoteSimulator::new(2); + let mut my_leader_bank = vote_simulator.bank_forks.read().unwrap().root_bank(); + + // Create a set of valid ancestor hashes for this fork + for _ in 0..MAX_ENTRIES { + my_leader_bank = Arc::new(Bank::new_from_parent( + &my_leader_bank, + &Pubkey::default(), + my_leader_bank.slot() + 1, + )); + } + let slot_hashes_account = my_leader_bank + .get_account(&sysvar::slot_hashes::id()) + .expect("Slot hashes sysvar must exist"); + let slot_hashes = from_account::(&slot_hashes_account).unwrap(); + + // Create valid votes + for i in 0..num_validators { + let vote_account_key = vote_simulator.vote_pubkeys[i]; + // Used to uniquely identify the packets for each validator + let num_packets = i + 1; + for (vote_slot, vote_hash) in slot_hashes.slot_hashes().iter() { + let vote = Vote::new(vec![*vote_slot], *vote_hash); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet: Packets::new(vec![Packet::default(); num_packets]), + signature: Signature::new_unique(), + }]) + .unwrap(); + } + } + + // Ingest the votes into the buffer + let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); verified_vote_packets - .receive_and_process_vote_packets(&r, &mut update_version, true) + .receive_and_process_vote_packets(&r, true) .unwrap(); - let update_version2 = verified_vote_packets.get_vote_packets(&label2).unwrap().0; - assert!(update_version2 > update_version1); - // Test empty doesn't bump the version - let before = update_version; - assert_matches!( - verified_vote_packets.receive_and_process_vote_packets(&r, &mut update_version, true), - Err(Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout)) + // Check we get two batches, one for each validator. Each batch + // should only contain a packets structure with the specific number + // of packets associated with that batch + assert_eq!(verified_vote_packets.0.len(), 2); + // Every validator should have `slot_hashes.slot_hashes().len()` votes + assert!(verified_vote_packets + .0 + .values() + .all(|validator_votes| validator_votes.len() == slot_hashes.slot_hashes().len())); + + let mut previously_sent_to_bank_votes = HashSet::new(); + let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( + my_leader_bank.clone(), + &verified_vote_packets, + &mut previously_sent_to_bank_votes, ); - assert_eq!(before, update_version); + + // Get and verify batches + let num_expected_batches = 2; + for _ in 0..num_expected_batches { + let validator_batch: Vec = gossip_votes_iterator.next().unwrap(); + assert_eq!(validator_batch.len(), slot_hashes.slot_hashes().len()); + let expected_len = validator_batch[0].packets.len(); + assert!(validator_batch + .iter() + .all(|p| p.packets.len() == expected_len)); + } + + // Should be empty now + assert!(gossip_votes_iterator.next().is_none()); + + // If we construct another iterator, should return nothing because `previously_sent_to_bank_votes` + // should filter out everything + let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( + my_leader_bank.clone(), + &verified_vote_packets, + &mut previously_sent_to_bank_votes, + ); + assert!(gossip_votes_iterator.next().is_none()); + + // If we add a new vote, we should return it + my_leader_bank.freeze(); + let vote_slot = my_leader_bank.slot(); + let vote_hash = my_leader_bank.hash(); + let my_leader_bank = Arc::new(Bank::new_from_parent( + &my_leader_bank, + &Pubkey::default(), + my_leader_bank.slot() + 1, + )); + let vote_account_key = vote_simulator.vote_pubkeys[1]; + let vote = Vote::new(vec![vote_slot], vote_hash); + s.send(vec![VerifiedVoteMetadata { + vote_account_key, + vote, + packet: Packets::default(), + signature: Signature::new_unique(), + }]) + .unwrap(); + // Ingest the votes into the buffer + verified_vote_packets + .receive_and_process_vote_packets(&r, true) + .unwrap(); + let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( + my_leader_bank, + &verified_vote_packets, + &mut previously_sent_to_bank_votes, + ); + assert!(gossip_votes_iterator.next().is_some()); + assert!(gossip_votes_iterator.next().is_none()); } } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index dc60d0b835..b8fd26525a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1092,7 +1092,27 @@ impl ClusterInfo { } /// Returns votes inserted since the given cursor. - pub fn get_votes(&self, cursor: &mut Cursor) -> (Vec, Vec) { + pub fn get_votes(&self, cursor: &mut Cursor) -> Vec { + let txs: Vec = self + .time_gossip_read_lock("get_votes", &self.stats.get_votes) + .get_votes(cursor) + .map(|vote| { + let transaction = match &vote.value.data { + CrdsData::Vote(_, vote) => vote.transaction().clone(), + _ => panic!("this should not happen!"), + }; + transaction + }) + .collect(); + inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); + txs + } + + /// Returns votes and the associated labels inserted since the given cursor. + pub fn get_votes_with_labels( + &self, + cursor: &mut Cursor, + ) -> (Vec, Vec) { let (labels, txs): (_, Vec<_>) = self .time_gossip_read_lock("get_votes", &self.stats.get_votes) .get_votes(cursor) @@ -3558,7 +3578,7 @@ mod tests { ); cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone()); let mut cursor = Cursor::default(); - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![unrefresh_tx.clone()]); // Now construct vote for the slot to be refreshed later @@ -3578,9 +3598,9 @@ mod tests { // Trying to refresh vote when it doesn't yet exist in gossip // shouldn't add the vote cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot); - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![]); - let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); + let votes = cluster_info.get_votes(&mut Cursor::default()); assert_eq!(votes.len(), 1); assert!(votes.contains(&unrefresh_tx)); @@ -3588,7 +3608,7 @@ mod tests { cluster_info.push_vote(&refresh_tower, refresh_tx.clone()); // Should be two votes in gossip - let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); + let votes = cluster_info.get_votes(&mut Cursor::default()); assert_eq!(votes.len(), 2); assert!(votes.contains(&unrefresh_tx)); assert!(votes.contains(&refresh_tx)); @@ -3612,12 +3632,12 @@ mod tests { cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot); } // The diff since `max_ts` should only be the latest refreshed vote - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); assert_eq!(votes[0], latest_refresh_tx); // Should still be two votes in gossip - let (_, votes) = cluster_info.get_votes(&mut Cursor::default()); + let votes = cluster_info.get_votes(&mut Cursor::default()); assert_eq!(votes.len(), 2); assert!(votes.contains(&unrefresh_tx)); assert!(votes.contains(&latest_refresh_tx)); @@ -3636,7 +3656,7 @@ mod tests { // make sure empty crds is handled correctly let mut cursor = Cursor::default(); - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![]); // add a vote @@ -3656,7 +3676,7 @@ mod tests { let tower = vec![7]; // Last slot in the vote. cluster_info.push_vote(&tower, tx.clone()); - let (labels, votes) = cluster_info.get_votes(&mut cursor); + let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor); assert_eq!(votes, vec![tx]); assert_eq!(labels.len(), 1); match labels[0] { @@ -3667,7 +3687,7 @@ mod tests { _ => panic!("Bad match"), } // make sure timestamp filter works - let (_, votes) = cluster_info.get_votes(&mut cursor); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes, vec![]); } @@ -3687,7 +3707,7 @@ mod tests { #[test] fn test_push_votes_with_tower() { let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec { - let (labels, _) = cluster_info.get_votes(&mut Cursor::default()); + let (labels, _) = cluster_info.get_votes_with_labels(&mut Cursor::default()); let gossip_crds = cluster_info.gossip.crds.read().unwrap(); let mut vote_slots = HashSet::new(); for label in labels { diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 2365c1caa0..785cd66f9e 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -332,7 +332,6 @@ pub fn cluster_info_scale() { //if node.0.get_votes(0).1.len() != (num_nodes * num_votes) { let has_tx = node .get_votes(&mut Cursor::default()) - .1 .iter() .filter(|v| v.message.account_keys == tx.message.account_keys) .count(); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index d7f1ba57fd..accd56c912 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -50,7 +50,7 @@ use { solana_sdk::{ account::AccountSharedData, client::{AsyncClient, SyncClient}, - clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}, + clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::CommitmentConfig, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::ClusterType, @@ -1110,12 +1110,8 @@ fn test_fork_choice_refresh_old_votes() { let ticks_per_slot = 8; let on_before_partition_resolved = |cluster: &mut LocalCluster, context: &mut PartitionContext| { - // Equal to ms_per_slot * MAX_RECENT_BLOCKHASHES, rounded up - let sleep_time_ms = - ((ticks_per_slot * DEFAULT_MS_PER_SLOT * MAX_RECENT_BLOCKHASHES as u64) - + DEFAULT_TICKS_PER_SLOT - - 1) - / DEFAULT_TICKS_PER_SLOT; + // Equal to ms_per_slot * MAX_PROCESSING_AGE, rounded up + let sleep_time_ms = ms_for_n_slots(MAX_PROCESSING_AGE as u64, ticks_per_slot); info!("Wait for blockhashes to expire, {} ms", sleep_time_ms); // Wait for blockhashes to expire @@ -2680,7 +2676,7 @@ fn test_duplicate_shreds_broadcast_leader() { return; } - let (labels, votes) = cluster_info.get_votes(&mut cursor); + let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor); let mut parsed_vote_iter: Vec<_> = labels .into_iter() .zip(votes.into_iter()) @@ -3920,6 +3916,107 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { ); } +#[test] +#[serial] +fn test_votes_land_in_fork_during_long_partition() { + let total_stake = 100; + // Make `lighter_stake` insufficient for switching threshold + let lighter_stake = (SWITCH_FORK_THRESHOLD as f64 * total_stake as f64) as u64; + let heavier_stake = lighter_stake + 1; + let failures_stake = total_stake - lighter_stake - heavier_stake; + + // Give lighter stake 30 consecutive slots before + // the heavier stake gets a single slot + let partitions: &[&[(usize, usize)]] = &[ + &[(heavier_stake as usize, 1)], + &[(lighter_stake as usize, 30)], + ]; + + #[derive(Default)] + struct PartitionContext { + heaviest_validator_key: Pubkey, + lighter_validator_key: Pubkey, + heavier_fork_slot: Slot, + } + + let on_partition_start = |_cluster: &mut LocalCluster, + validator_keys: &[Pubkey], + _dead_validator_infos: Vec, + context: &mut PartitionContext| { + // validator_keys[0] is the validator that will be killed, i.e. the validator with + // stake == `failures_stake` + context.heaviest_validator_key = validator_keys[1]; + context.lighter_validator_key = validator_keys[2]; + }; + + let on_before_partition_resolved = + |cluster: &mut LocalCluster, context: &mut PartitionContext| { + let lighter_validator_ledger_path = cluster.ledger_path(&context.lighter_validator_key); + let heavier_validator_ledger_path = + cluster.ledger_path(&context.heaviest_validator_key); + + // Wait for each node to have created and voted on its own partition + loop { + let (heavier_validator_latest_vote_slot, _) = last_vote_in_tower( + &heavier_validator_ledger_path, + &context.heaviest_validator_key, + ) + .unwrap(); + info!( + "Checking heavier validator's last vote {} is on a separate fork", + heavier_validator_latest_vote_slot + ); + let lighter_validator_blockstore = open_blockstore(&lighter_validator_ledger_path); + if lighter_validator_blockstore + .meta(heavier_validator_latest_vote_slot) + .unwrap() + .is_none() + { + context.heavier_fork_slot = heavier_validator_latest_vote_slot; + return; + } + sleep(Duration::from_millis(100)); + } + }; + + let on_partition_resolved = |cluster: &mut LocalCluster, context: &mut PartitionContext| { + let lighter_validator_ledger_path = cluster.ledger_path(&context.lighter_validator_key); + let start = Instant::now(); + let max_wait = ms_for_n_slots(MAX_PROCESSING_AGE as u64, DEFAULT_TICKS_PER_SLOT); + // Wait for the lighter node to switch over and root the `context.heavier_fork_slot` + loop { + assert!( + // Should finish faster than if the cluster were relying on replay vote + // refreshing to refresh the vote on blockhash expiration for the vote + // transaction. + !(start.elapsed() > Duration::from_millis(max_wait)), + "Went too long {} ms without a root", + max_wait, + ); + let lighter_validator_blockstore = open_blockstore(&lighter_validator_ledger_path); + if lighter_validator_blockstore.is_root(context.heavier_fork_slot) { + info!( + "Partition resolved, new root made in {}ms", + start.elapsed().as_millis() + ); + return; + } + sleep(Duration::from_millis(100)); + } + }; + + run_kill_partition_switch_threshold( + &[&[(failures_stake as usize, 0)]], + partitions, + None, + None, + PartitionContext::default(), + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + ); +} + fn setup_transfer_scan_threads( num_starting_accounts: usize, exit: Arc, @@ -4192,3 +4289,10 @@ fn setup_snapshot_validator_config( num_account_paths, ) } + +/// Computes the numbr of milliseconds `num_blocks` blocks will take given +/// each slot contains `ticks_per_slot` +fn ms_for_n_slots(num_blocks: u64, ticks_per_slot: u64) -> u64 { + ((ticks_per_slot * DEFAULT_MS_PER_SLOT * num_blocks) + DEFAULT_TICKS_PER_SLOT - 1) + / DEFAULT_TICKS_PER_SLOT +} diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 2ddecba175..1395a84599 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1009,9 +1009,6 @@ pub struct Bank { #[allow(clippy::rc_buffer)] feature_builtins: Arc>, - /// Last time when the cluster info vote listener has synced with this bank - pub last_vote_sync: AtomicU64, - /// Protocol-level rewards that were distributed by this bank pub rewards: RwLock>, @@ -1169,7 +1166,6 @@ impl Bank { instruction_processor: InstructionProcessor::default(), compute_budget: Option::::default(), feature_builtins: Arc::>::default(), - last_vote_sync: AtomicU64::default(), rewards: RwLock::>::default(), cluster_type: Option::::default(), lazy_rent_collection: AtomicBool::default(), @@ -1420,7 +1416,6 @@ impl Bank { compute_budget: parent.compute_budget, feature_builtins: parent.feature_builtins.clone(), hard_forks: parent.hard_forks.clone(), - last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Relaxed)), rewards: RwLock::new(vec![]), cluster_type: parent.cluster_type, lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)), @@ -1617,7 +1612,6 @@ impl Bank { instruction_processor: new(), compute_budget: None, feature_builtins: new(), - last_vote_sync: new(), rewards: new(), cluster_type: Some(genesis_config.cluster_type), lazy_rent_collection: new(), @@ -10857,19 +10851,6 @@ pub(crate) mod tests { assert_eq!(sysvar_recent_blockhash, bank_last_blockhash); } - #[test] - fn test_bank_inherit_last_vote_sync() { - let (genesis_config, _) = create_genesis_config(500); - let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); - let last_ts = bank0.last_vote_sync.load(Relaxed); - assert_eq!(last_ts, 0); - bank0.last_vote_sync.store(1, Relaxed); - let bank1 = - Bank::new_from_parent(&bank0, &Pubkey::default(), bank0.get_slots_in_epoch(0) - 1); - let last_ts = bank1.last_vote_sync.load(Relaxed); - assert_eq!(last_ts, 1); - } - #[test] fn test_hash_internal_state_unchanged() { let (genesis_config, _) = create_genesis_config(500); diff --git a/sdk/program/src/slot_hashes.rs b/sdk/program/src/slot_hashes.rs index 8016505a0a..e87d6ed900 100644 --- a/sdk/program/src/slot_hashes.rs +++ b/sdk/program/src/slot_hashes.rs @@ -34,6 +34,9 @@ impl SlotHashes { slot_hashes.sort_by(|(a, _), (b, _)| b.cmp(a)); Self(slot_hashes) } + pub fn slot_hashes(&self) -> &[SlotHash] { + &self.0 + } } impl FromIterator<(Slot, Hash)> for SlotHashes { diff --git a/sdk/src/signature.rs b/sdk/src/signature.rs index bf4f5e4322..7ac68e4fc3 100644 --- a/sdk/src/signature.rs +++ b/sdk/src/signature.rs @@ -32,6 +32,11 @@ impl Signature { Self(GenericArray::clone_from_slice(signature_slice)) } + pub fn new_unique() -> Self { + let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); + Self::new(&random_bytes) + } + pub(self) fn verify_verbose( &self, pubkey_bytes: &[u8],