diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 0ca4633d9a..62c46ed6dd 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -287,7 +287,7 @@ fn simulate_process_entries( hash: next_hash(&bank.last_blockhash(), 1, &tx_vector), transactions: tx_vector, }; - process_entries(&bank, &[entry], randomize_txs, None).unwrap(); + process_entries(&bank, &[entry], randomize_txs, None, None).unwrap(); } fn bench_process_entries(randomize_txs: bool, bencher: &mut Bencher) { diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index ef00434516..39ecb3cf77 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,11 +1,9 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, - consensus::PubkeyVotes, crds_value::CrdsValueLabel, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, poh_recorder::PohRecorder, pubkey_references::LockedPubkeyReferences, - replay_stage::ReplayVotesReceiver, result::{Error, Result}, rpc_subscriptions::RpcSubscriptions, sigverify, @@ -17,7 +15,10 @@ use crossbeam_channel::{ }; use itertools::izip; use log::*; -use solana_ledger::blockstore::Blockstore; +use solana_ledger::{ + blockstore::Blockstore, + blockstore_processor::{ReplayVotesReceiver, ReplayedVote}, +}; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{self, Packets}; use solana_runtime::{ @@ -34,9 +35,9 @@ use solana_sdk::{ pubkey::Pubkey, transaction::Transaction, }; -use solana_vote_program::{self, vote_transaction}; +use solana_vote_program::{self, vote_state::Vote, vote_transaction}; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, {Arc, Mutex, RwLock}, @@ -417,7 +418,7 @@ impl ClusterInfoVoteListener { fn process_votes_loop( exit: Arc, - vote_txs_receiver: VerifiedVoteTransactionsReceiver, + gossip_vote_txs_receiver: VerifiedVoteTransactionsReceiver, vote_tracker: Arc, bank_forks: Arc>, subscriptions: Arc, @@ -449,7 +450,7 @@ impl ClusterInfoVoteListener { last_process_root = Instant::now(); } let optimistic_confirmed_slots = Self::get_and_process_votes( - &vote_txs_receiver, + &gossip_vote_txs_receiver, &vote_tracker, &root_bank, &subscriptions, @@ -475,7 +476,7 @@ impl ClusterInfoVoteListener { #[cfg(test)] pub fn get_and_process_votes_for_tests( - vote_txs_receiver: &VerifiedVoteTransactionsReceiver, + gossip_vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &VoteTracker, root_bank: &Bank, subscriptions: &RpcSubscriptions, @@ -483,7 +484,7 @@ impl ClusterInfoVoteListener { replay_votes_receiver: &ReplayVotesReceiver, ) -> Result> { Self::get_and_process_votes( - vote_txs_receiver, + gossip_vote_txs_receiver, vote_tracker, root_bank, subscriptions, @@ -493,7 +494,7 @@ impl ClusterInfoVoteListener { } fn get_and_process_votes( - vote_txs_receiver: &VerifiedVoteTransactionsReceiver, + gossip_vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &VoteTracker, root_bank: &Bank, subscriptions: &RpcSubscriptions, @@ -501,7 +502,7 @@ impl ClusterInfoVoteListener { replay_votes_receiver: &ReplayVotesReceiver, ) -> Result> { let mut sel = Select::new(); - sel.recv(vote_txs_receiver); + sel.recv(gossip_vote_txs_receiver); sel.recv(replay_votes_receiver); let mut remaining_wait_time = 200; loop { @@ -517,16 +518,16 @@ impl ClusterInfoVoteListener { // Should not early return from this point onwards until `process_votes()` // returns below to avoid missing any potential `optimistic_confirmed_slots` - let vote_txs: Vec<_> = vote_txs_receiver.try_iter().flatten().collect(); + let gossip_vote_txs: Vec<_> = gossip_vote_txs_receiver.try_iter().flatten().collect(); let replay_votes: Vec<_> = replay_votes_receiver.try_iter().collect(); - if !vote_txs.is_empty() || !replay_votes.is_empty() { + if !gossip_vote_txs.is_empty() || !replay_votes.is_empty() { return Ok(Self::process_votes( vote_tracker, - vote_txs, + gossip_vote_txs, + replay_votes, root_bank, subscriptions, verified_vote_sender, - &replay_votes, )); } else { remaining_wait_time = remaining_wait_time @@ -536,120 +537,161 @@ impl ClusterInfoVoteListener { Ok(vec![]) } - fn process_votes( + #[allow(clippy::too_many_arguments)] + fn update_new_votes( + vote: Vote, + vote_pubkey: &Pubkey, vote_tracker: &VoteTracker, - vote_txs: Vec, root_bank: &Bank, subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, - replay_votes: &[Arc], - ) -> Vec<(Slot, Hash)> { - let mut optimistic_confirmation_counted: HashSet<(Slot, Pubkey)> = HashSet::new(); - let mut diff: HashMap, bool>> = HashMap::new(); - let mut new_optimistic_confirmed_slots = vec![]; - let root = root_bank.slot(); - { - for tx in vote_txs { - if let Some((vote_pubkey, vote, _)) = vote_transaction::parse_vote_transaction(&tx) - { - if vote.slots.is_empty() { - continue; - } - - let last_vote_slot = vote.slots.last().unwrap(); - - // Determine the authorized voter based on the last vote slot. This will - // drop votes from authorized voters trying to make votes for slots - // earlier than the epoch for which they are authorized - let actual_authorized_voter = - vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); - - if actual_authorized_voter.is_none() { - continue; - } - - // Voting without the correct authorized pubkey, dump the vote - if !VoteTracker::vote_contains_authorized_voter( - &tx, - &actual_authorized_voter.unwrap(), - ) { - continue; - } - - let root = root_bank.slot(); - let last_vote_hash = vote.hash; - for slot in &vote.slots { - // If slot is before the root, or so far ahead we don't have - // stake information, then ignore it - let epoch = root_bank.epoch_schedule().get_epoch(*slot); - let epoch_stakes = root_bank.epoch_stakes(epoch); - if *slot <= root || epoch_stakes.is_none() { - continue; - } - let epoch_stakes = epoch_stakes.unwrap(); - let epoch_vote_accounts = Stakes::vote_accounts(epoch_stakes.stakes()); - let total_epoch_stake = epoch_stakes.total_stake(); - - let unduplicated_pubkey = vote_tracker.keys.get_or_insert(&vote_pubkey); - - // The last vote slot , which is the greatest slot in the stack - // of votes in a vote transaction, qualifies for optimistic confirmation. - let update_optimistic_confirmation_info = if slot == last_vote_slot { - let stake = epoch_vote_accounts - .get(&vote_pubkey) - .map(|(stake, _)| *stake) - .unwrap_or(0); - Some((stake, last_vote_hash)) - } else { - None - }; - - // If this vote for this slot qualifies for optimistic confirmation - if let Some((stake, hash)) = update_optimistic_confirmation_info { - // Fast track processing of the last slot in a vote transactions - // so that notifications for optimistic confirmation can be sent - // as soon as possible. - if !optimistic_confirmation_counted - .contains(&(*slot, *unduplicated_pubkey)) - && Self::add_optimistic_confirmation_vote( - vote_tracker, - *slot, - hash, - unduplicated_pubkey.clone(), - stake, - total_epoch_stake, - ) - { - optimistic_confirmation_counted - .insert((*slot, *unduplicated_pubkey)); - new_optimistic_confirmed_slots.push((*slot, last_vote_hash)); - // TODO: Notify subscribers about new optimistic confirmation - } - } - - diff.entry(*slot) - .or_default() - .insert(unduplicated_pubkey, true); - } - - subscriptions.notify_vote(&vote); - let _ = verified_vote_sender.send((vote_pubkey, vote.slots)); - } - } + diff: &mut HashMap, bool>>, + new_optimistic_confirmed_slots: &mut Vec<(Slot, Hash)>, + is_gossip_vote: bool, + ) { + if vote.slots.is_empty() { + return; } - // Process the replay votes - for votes in replay_votes { - for (pubkey, slot) in votes.iter() { - if *slot <= root { - continue; - } - let unduplicated_pubkey = vote_tracker.keys.get_or_insert(pubkey); - diff.entry(*slot) - .or_default() - .entry(unduplicated_pubkey) - .or_default(); + let last_vote_slot = vote.slots.last().unwrap(); + + let root = root_bank.slot(); + let last_vote_hash = vote.hash; + let mut is_new_vote = false; + for slot in vote.slots.iter().rev() { + // If slot is before the root, or so far ahead we don't have + // stake information, then ignore it + let epoch = root_bank.epoch_schedule().get_epoch(*slot); + let epoch_stakes = root_bank.epoch_stakes(epoch); + if *slot <= root || epoch_stakes.is_none() { + continue; } + let epoch_stakes = epoch_stakes.unwrap(); + let epoch_vote_accounts = Stakes::vote_accounts(epoch_stakes.stakes()); + let total_epoch_stake = epoch_stakes.total_stake(); + let unduplicated_pubkey = vote_tracker.keys.get_or_insert(&vote_pubkey); + + // The last vote slot, which is the greatest slot in the stack + // of votes in a vote transaction, qualifies for optimistic confirmation. + let update_optimistic_confirmation_info = if slot == last_vote_slot { + let stake = epoch_vote_accounts + .get(&vote_pubkey) + .map(|(stake, _)| *stake) + .unwrap_or(0); + Some((stake, last_vote_hash)) + } else { + None + }; + + // If this vote for this slot qualifies for optimistic confirmation + if let Some((stake, hash)) = update_optimistic_confirmation_info { + // Fast track processing of the last slot in a vote transactions + // so that notifications for optimistic confirmation can be sent + // as soon as possible. + let (is_confirmed, is_new) = Self::add_optimistic_confirmation_vote( + vote_tracker, + *slot, + hash, + unduplicated_pubkey.clone(), + stake, + total_epoch_stake, + ); + + if is_confirmed { + new_optimistic_confirmed_slots.push((*slot, last_vote_hash)); + // TODO: Notify subscribers about new optimistic confirmation + } + + if !is_new && !is_gossip_vote { + // By now: + // 1) The vote must have come from ReplayStage, + // 2) We've seen this vote from replay for this hash before + // (`add_optimistic_confirmation_vote()` will not set `is_new == true` + // for same slot different hash), so short circuit because this vote + // has no new information + + // Note gossip votes will always be processed because those should be unique + // and we need to update the gossip-only stake in the `VoteTracker`. + return; + } + + is_new_vote = is_new; + } + + diff.entry(*slot) + .or_default() + .entry(unduplicated_pubkey) + .and_modify(|seen_in_gossip_previously| { + *seen_in_gossip_previously = *seen_in_gossip_previously || is_gossip_vote + }) + .or_insert(is_gossip_vote); + } + + if is_new_vote { + subscriptions.notify_vote(&vote); + let _ = verified_vote_sender.send((*vote_pubkey, vote.slots)); + } + } + + fn process_votes( + vote_tracker: &VoteTracker, + gossip_vote_txs: Vec, + replayed_votes: Vec, + root_bank: &Bank, + subscriptions: &RpcSubscriptions, + verified_vote_sender: &VerifiedVoteSender, + ) -> Vec<(Slot, Hash)> { + let mut diff: HashMap, bool>> = HashMap::new(); + let mut new_optimistic_confirmed_slots = vec![]; + + // Process votes from gossip and ReplayStage + for (i, (vote_pubkey, vote, _)) in gossip_vote_txs + .iter() + .filter_map(|gossip_tx| { + vote_transaction::parse_vote_transaction(gossip_tx).filter( + |(vote_pubkey, vote, _)| { + if vote.slots.is_empty() { + return false; + } + let last_vote_slot = vote.slots.last().unwrap(); + // Votes from gossip need to be verified as they have not been + // verified by the replay pipeline. Determine the authorized voter + // based on the last vote slot. This will drop votes from authorized + // voters trying to make votes for slots earlier than the epoch for + // which they are authorized + let actual_authorized_voter = + vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); + + if actual_authorized_voter.is_none() { + return false; + } + + // Voting without the correct authorized pubkey, dump the vote + if !VoteTracker::vote_contains_authorized_voter( + &gossip_tx, + &actual_authorized_voter.unwrap(), + ) { + return false; + } + + true + }, + ) + }) + .chain(replayed_votes) + .enumerate() + { + Self::update_new_votes( + vote, + &vote_pubkey, + &vote_tracker, + root_bank, + subscriptions, + verified_vote_sender, + &mut diff, + &mut new_optimistic_confirmed_slots, + i < gossip_vote_txs.len(), + ); } // Process all the slots accumulated from replay and gossip. @@ -661,13 +703,6 @@ impl ClusterInfoVoteListener { slot_diff.retain(|pubkey, seen_in_gossip_above| { let seen_in_gossip_previously = r_slot_tracker.voted.get(pubkey); let is_new = seen_in_gossip_previously.is_none(); - if is_new && !*seen_in_gossip_above { - // If this vote wasn't seen in gossip, then it must be a - // replay vote, and we haven't sent a notification for - // those yet - let _ = verified_vote_sender.send((**pubkey, vec![slot])); - } - // `is_new_from_gossip` means we observed a vote for this slot // for the first time in gossip let is_new_from_gossip = !seen_in_gossip_previously.cloned().unwrap_or(false) @@ -721,7 +756,8 @@ impl ClusterInfoVoteListener { new_optimistic_confirmed_slots } - // Returns if the slot was optimistically confirmed + // Returns if the slot was optimistically confirmed, and whether + // the slot was new fn add_optimistic_confirmation_vote( vote_tracker: &VoteTracker, slot: Slot, @@ -729,7 +765,7 @@ impl ClusterInfoVoteListener { pubkey: Arc, stake: u64, total_epoch_stake: u64, - ) -> bool { + ) -> (bool, bool) { let slot_tracker = vote_tracker.get_or_insert_slot_tracker(slot); // Insert vote and check for optimistic confirmation let mut w_slot_tracker = slot_tracker.write().unwrap(); @@ -784,16 +820,18 @@ impl ClusterInfoVoteListener { #[cfg(test)] mod tests { use super::*; - use crate::replay_stage::ReplayVotesSender; + use solana_ledger::blockstore_processor::ReplayVotesSender; use solana_perf::packet; use solana_runtime::{ bank::Bank, commitment::BlockCommitmentCache, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, }; - use solana_sdk::hash::Hash; - use solana_sdk::signature::Signature; - use solana_sdk::signature::{Keypair, Signer}; + use solana_sdk::{ + hash::Hash, + signature::{Keypair, Signature, Signer}, + }; + use solana_vote_program::vote_state::Vote; use std::collections::BTreeSet; #[test] @@ -1048,7 +1086,7 @@ mod tests { gossip_vote_slots: Vec, replay_vote_slots: Vec, validator_voting_keypairs: &[ValidatorVoteKeypairs], - hash: Option, + switch_proof_hash: Option, votes_sender: &VerifiedVoteTransactionsSender, replay_votes_sender: &ReplayVotesSender, ) { @@ -1062,16 +1100,18 @@ mod tests { node_keypair, vote_keypair, vote_keypair, - hash, + switch_proof_hash, ); votes_sender.send(vec![vote_tx]).unwrap(); - for vote_slot in &replay_vote_slots { - // Send twice, should only expect to be notified once later + let replay_vote = Vote::new(replay_vote_slots.clone(), Hash::default()); + // Send same vote twice, but should only notify once + for _ in 0..2 { replay_votes_sender - .send(Arc::new(vec![(vote_keypair.pubkey(), *vote_slot)])) - .unwrap(); - replay_votes_sender - .send(Arc::new(vec![(vote_keypair.pubkey(), *vote_slot)])) + .send(( + vote_keypair.pubkey(), + replay_vote.clone(), + switch_proof_hash, + )) .unwrap(); } }); @@ -1155,7 +1195,7 @@ mod tests { // the `optimistic` vote set. let optimistic_votes_tracker = r_slot_vote_tracker.optimistic_votes_tracker(&Hash::default()); - if vote_slot == 2 { + if vote_slot == 2 || vote_slot == 4 { let optimistic_votes_tracker = optimistic_votes_tracker.unwrap(); assert!(optimistic_votes_tracker.voted().contains(&pubkey)); assert_eq!( @@ -1269,7 +1309,7 @@ mod tests { } } - fn run_test_process_votes3(hash: Option) { + fn run_test_process_votes3(switch_proof_hash: Option) { let (votes_sender, votes_receiver) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded(); @@ -1288,6 +1328,7 @@ mod tests { vec![2], vec![0, 1, 2], vec![1, 0, 2], + vec![0, 1, 2, 0, 1, 2], ]; for events in ordered_events { let (vote_tracker, bank, validator_voting_keypairs, subscriptions) = setup(); @@ -1303,13 +1344,17 @@ mod tests { node_keypair, vote_keypair, vote_keypair, - hash, + switch_proof_hash, ); votes_sender.send(vec![vote_tx.clone()]).unwrap(); } if e == 1 || e == 2 { replay_votes_sender - .send(Arc::new(vec![(vote_keypair.pubkey(), vote_slot)])) + .send(( + vote_keypair.pubkey(), + Vote::new(vec![vote_slot], Hash::default()), + switch_proof_hash, + )) .unwrap(); } let _ = ClusterInfoVoteListener::get_and_process_votes( @@ -1403,9 +1448,6 @@ mod tests { // SlotVoteTracker.voted, one in SlotVoteTracker.updates, one in // SlotVoteTracker.optimistic_votes_tracker let ref_count_per_vote = 3; - // Replay votes don't get added to `SlotVoteTracker.optimistic_votes_tracker`, - // so there's one less - let ref_count_per_replay_vote = ref_count_per_vote - 1; let ref_count_per_new_key = 1; // Create some voters at genesis @@ -1448,14 +1490,15 @@ mod tests { ClusterInfoVoteListener::process_votes( &vote_tracker, vote_tx, + // Add gossip vote for same slot, should not affect outcome + vec![( + validator0_keypairs.vote_keypair.pubkey(), + Vote::new(vec![voted_slot], Hash::default()), + None, + )], &bank, &subscriptions, &verified_vote_sender, - // Add vote for same slot, should not affect outcome - &[Arc::new(vec![( - validator0_keypairs.vote_keypair.pubkey(), - voted_slot, - )])], ); let ref_count = Arc::strong_count( &vote_tracker @@ -1517,13 +1560,14 @@ mod tests { ClusterInfoVoteListener::process_votes( &vote_tracker, vote_txs, + vec![( + validator_keypairs[1].vote_keypair.pubkey(), + Vote::new(vec![first_slot_in_new_epoch], Hash::default()), + None, + )], &new_root_bank, &subscriptions, &verified_vote_sender, - &[Arc::new(vec![( - validator_keypairs[1].vote_keypair.pubkey(), - first_slot_in_new_epoch, - )])], ); // Check new replay vote pubkey first @@ -1540,7 +1584,7 @@ mod tests { // `ref_count_per_optimistic_vote + ref_count_per_new_key`. // +ref_count_per_new_key for the new pubkey in `vote_tracker.keys` and // +ref_count_per_optimistic_vote for the one new vote - assert_eq!(ref_count, ref_count_per_replay_vote + ref_count_per_new_key); + assert_eq!(ref_count, ref_count_per_vote + ref_count_per_new_key); // Check the existing pubkey let ref_count = Arc::strong_count( diff --git a/core/src/consensus.rs b/core/src/consensus.rs index c0d9138bd5..7b00538eb4 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -679,7 +679,6 @@ pub mod test { progress_map::ForkProgress, replay_stage::{HeaviestForkFailures, ReplayStage}, }; - use crossbeam_channel::unbounded; use solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -795,7 +794,6 @@ pub mod test { .cloned() .collect(); - let (replay_slot_sender, _replay_slot_receiver) = unbounded(); let _ = ReplayStage::compute_bank_stats( &my_pubkey, &ancestors, @@ -808,7 +806,6 @@ pub mod test { &mut PubkeyReferences::default(), &mut self.heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), - &replay_slot_sender, ); let vote_bank = self diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 55c47a0404..39a7561518 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -7,7 +7,7 @@ use crate::{ cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, - consensus::{ComputedBankState, PubkeyVotes, Stake, SwitchForkDecision, Tower, VotedStakes}, + consensus::{ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, @@ -18,11 +18,12 @@ use crate::{ rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, }; -use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use solana_ledger::{ block_error::BlockError, blockstore::Blockstore, - blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, + blockstore_processor::{ + self, BlockstoreProcessorError, ReplayVotesSender, TransactionStatusSender, + }, entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, }; @@ -62,9 +63,6 @@ pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; -pub type ReplayVotesSender = CrossbeamSender>; -pub type ReplayVotesReceiver = CrossbeamReceiver>; - #[derive(PartialEq, Debug)] pub(crate) enum HeaviestForkFailures { LockedOut(u64), @@ -346,6 +344,7 @@ impl ReplayStage { &verify_recyclers, &mut heaviest_subtree_fork_choice, &subscriptions, + &replay_votes_sender, ); replay_active_banks_time.stop(); Self::report_memory(&allocated, "replay_active_banks", start); @@ -392,7 +391,6 @@ impl ReplayStage { &mut all_pubkeys, &mut heaviest_subtree_fork_choice, &mut bank_weight_fork_choice, - &replay_votes_sender, ); compute_bank_stats_time.stop(); @@ -942,6 +940,7 @@ impl ReplayStage { blockstore: &Blockstore, bank_progress: &mut ForkProgress, transaction_status_sender: Option, + replay_votes_sender: &ReplayVotesSender, verify_recyclers: &VerifyRecyclers, ) -> result::Result { let tx_count_before = bank_progress.replay_progress.num_txs; @@ -952,6 +951,7 @@ impl ReplayStage { &mut bank_progress.replay_progress, false, transaction_status_sender, + Some(replay_votes_sender), None, verify_recyclers, ); @@ -1203,6 +1203,7 @@ impl ReplayStage { ); } + #[allow(clippy::too_many_arguments)] fn replay_active_banks( blockstore: &Arc, bank_forks: &Arc>, @@ -1213,6 +1214,7 @@ impl ReplayStage { verify_recyclers: &VerifyRecyclers, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, subscriptions: &Arc, + replay_votes_sender: &ReplayVotesSender, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1258,6 +1260,7 @@ impl ReplayStage { &blockstore, bank_progress, transaction_status_sender.clone(), + replay_votes_sender, verify_recyclers, ); match replay_result { @@ -1309,7 +1312,6 @@ impl ReplayStage { all_pubkeys: &mut PubkeyReferences, heaviest_subtree_fork_choice: &mut dyn ForkChoice, bank_weight_fork_choice: &mut dyn ForkChoice, - replay_votes_sender: &ReplayVotesSender, ) -> Vec { frozen_banks.sort_by_key(|bank| bank.slot()); let mut new_stats = vec![]; @@ -1333,7 +1335,6 @@ impl ReplayStage { ); // Notify any listeners of the votes found in this newly computed // bank - let _ = replay_votes_sender.send(computed_bank_state.pubkey_votes.clone()); heaviest_subtree_fork_choice.compute_bank_stats( &bank, tower, @@ -2414,6 +2415,7 @@ pub(crate) mod tests { F: Fn(&Keypair, Arc) -> Vec, { let ledger_path = get_tmp_ledger_path!(); + let (replay_votes_sender, _replay_votes_receiver) = unbounded(); let res = { let blockstore = Arc::new( Blockstore::open(&ledger_path) @@ -2438,6 +2440,7 @@ pub(crate) mod tests { &blockstore, &mut bank0_progress, None, + &replay_votes_sender, &VerifyRecyclers::default(), ); @@ -2601,6 +2604,7 @@ pub(crate) mod tests { blockstore.set_roots(&[slot]).unwrap(); let (transaction_status_sender, transaction_status_receiver) = unbounded(); + let (replay_votes_sender, _replay_votes_receiver) = unbounded(); let transaction_status_service = TransactionStatusService::new( transaction_status_receiver, blockstore, @@ -2614,6 +2618,7 @@ pub(crate) mod tests { &entries, true, Some(transaction_status_sender), + Some(&replay_votes_sender), ); transaction_status_service.join().unwrap(); @@ -2724,7 +2729,6 @@ pub(crate) mod tests { .cloned() .collect(); let tower = Tower::new_for_tests(0, 0.67); - let (replay_votes_sender, replay_votes_receiver) = unbounded(); let newly_computed = ReplayStage::compute_bank_stats( &node_pubkey, &ancestors, @@ -2737,11 +2741,9 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), - &replay_votes_sender, ); // bank 0 has no votes, should not send any votes on the channel - assert_eq!(replay_votes_receiver.try_recv().unwrap(), Arc::new(vec![])); assert_eq!(newly_computed, vec![0]); // The only vote is in bank 1, and bank_forks does not currently contain @@ -2785,15 +2787,9 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), - &replay_votes_sender, ); - // Bank 1 had one vote, ensure that `compute_bank_stats` notifies listeners - // via `replay_votes_receiver`. - assert_eq!( - replay_votes_receiver.try_recv().unwrap(), - Arc::new(vec![(my_keypairs.vote_keypair.pubkey(), 0)]) - ); + // Bank 1 had one vote assert_eq!(newly_computed, vec![1]); { let fork_progress = progress.get(&1).unwrap(); @@ -2827,10 +2823,8 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), - &replay_votes_sender, ); // No new stats should have been computed - assert!(replay_votes_receiver.try_iter().next().is_none()); assert!(newly_computed.is_empty()); } @@ -2855,7 +2849,6 @@ pub(crate) mod tests { .collect(); let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); - let (replay_votes_sender, _replay_votes_receiver) = unbounded(); ReplayStage::compute_bank_stats( &node_pubkey, &ancestors, @@ -2868,7 +2861,6 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), - &replay_votes_sender, ); assert_eq!( @@ -2931,7 +2923,6 @@ pub(crate) mod tests { .cloned() .collect(); - let (replay_votes_sender, _replay_votes_receiver) = unbounded(); ReplayStage::compute_bank_stats( &node_pubkey, &vote_simulator.bank_forks.read().unwrap().ancestors(), @@ -2944,7 +2935,6 @@ pub(crate) mod tests { &mut PubkeyReferences::default(), &mut vote_simulator.heaviest_subtree_fork_choice, &mut BankWeightForkChoice::default(), - &replay_votes_sender, ); frozen_banks.sort_by_key(|bank| bank.slot()); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 0e4ffb0e3a..72f2901f44 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,13 +8,15 @@ use crate::{ cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker}, fetch_stage::FetchStage, poh_recorder::{PohRecorder, WorkingBankEntry}, - replay_stage::ReplayVotesReceiver, rpc_subscriptions::RpcSubscriptions, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }; use crossbeam_channel::unbounded; -use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}; +use solana_ledger::{ + blockstore::Blockstore, + blockstore_processor::{ReplayVotesReceiver, TransactionStatusSender}, +}; use solana_runtime::bank_forks::BankForks; use std::{ net::UdpSocket, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e229002f41..3531f4ab21 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -10,7 +10,7 @@ use crate::{ cluster_slots::ClusterSlots, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, - replay_stage::{ReplayStage, ReplayStageConfig, ReplayVotesSender}, + replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, @@ -21,7 +21,7 @@ use crate::{ use crossbeam_channel::unbounded; use solana_ledger::{ blockstore::{Blockstore, CompletedSlotsReceiver}, - blockstore_processor::TransactionStatusSender, + blockstore_processor::{ReplayVotesSender, TransactionStatusSender}, leader_schedule_cache::LeaderScheduleCache, }; use solana_runtime::{ diff --git a/core/src/validator.rs b/core/src/validator.rs index 9167b646c4..e59311bb84 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -27,7 +27,7 @@ use solana_ledger::{ bank_forks_utils, blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, blockstore_db::BlockstoreRecoveryMode, - blockstore_processor::{self, TransactionStatusSender}, + blockstore_processor::{self, ReplayVotesSender, TransactionStatusSender}, create_new_tmp_ledger, leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, @@ -223,6 +223,7 @@ impl Validator { validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); + let (replay_votes_sender, replay_votes_receiver) = unbounded(); let ( genesis_config, bank_forks, @@ -237,7 +238,7 @@ impl Validator { rewards_recorder_sender, rewards_recorder_service, }, - ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit); + ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit, &replay_votes_sender); let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank = bank_forks.working_bank(); @@ -407,7 +408,6 @@ impl Validator { let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); - let (replay_votes_sender, replay_votes_receiver) = unbounded(); let tvu = Tvu::new( vote_account, authorized_voter_keypairs, @@ -574,6 +574,7 @@ fn new_banks_from_ledger( ledger_path: &Path, poh_verify: bool, exit: &Arc, + replay_votes_sender: &ReplayVotesSender, ) -> ( GenesisConfig, BankForks, @@ -635,6 +636,7 @@ fn new_banks_from_ledger( transaction_history_services .transaction_status_sender .clone(), + Some(&replay_votes_sender), ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); diff --git a/core/src/vote_stake_tracker.rs b/core/src/vote_stake_tracker.rs index ac1b20c68c..a6cb807c4b 100644 --- a/core/src/vote_stake_tracker.rs +++ b/core/src/vote_stake_tracker.rs @@ -9,23 +9,29 @@ pub struct VoteStakeTracker { } impl VoteStakeTracker { - // Returns true if the stake that has voted has just crosssed the supermajority + // Returns tuple (is_confirmed, is_new) where + // `is_confirmed` is true if the stake that has voted has just crosssed the supermajority // of stake + // `is_new` is true if the vote has not been seen before pub fn add_vote_pubkey( &mut self, vote_pubkey: Arc, stake: u64, total_epoch_stake: u64, - ) -> bool { - if !self.voted.contains(&vote_pubkey) { + ) -> (bool, bool) { + let is_new = !self.voted.contains(&vote_pubkey); + if is_new { self.voted.insert(vote_pubkey); let ratio_before = self.stake as f64 / total_epoch_stake as f64; self.stake += stake; let ratio_now = self.stake as f64 / total_epoch_stake as f64; - ratio_before <= VOTE_THRESHOLD_SIZE && ratio_now > VOTE_THRESHOLD_SIZE + ( + ratio_before <= VOTE_THRESHOLD_SIZE && ratio_now > VOTE_THRESHOLD_SIZE, + is_new, + ) } else { - false + (false, is_new) } } @@ -48,21 +54,26 @@ mod test { let mut vote_stake_tracker = VoteStakeTracker::default(); for i in 0..10 { let pubkey = Arc::new(Pubkey::new_rand()); - let res = vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake); + let (is_confirmed, is_new) = + vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake); let stake = vote_stake_tracker.stake(); - vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake); + let (is_confirmed2, is_new2) = + vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake); let stake2 = vote_stake_tracker.stake(); // Stake should not change from adding same pubkey twice assert_eq!(stake, stake2); + assert!(!is_confirmed2); + assert!(!is_new2); // at i == 7, the voted stake is 70%, which is the first time crossing // the supermajority threshold if i == 6 { - assert!(res); + assert!(is_confirmed); } else { - assert!(!res); + assert!(!is_confirmed); } + assert!(is_new); } } } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index e3e0944bf4..83ca891e35 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -686,6 +686,7 @@ fn load_bank_forks( snapshot_config.as_ref(), process_options, None, + None, ) } diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 4002b7898d..96f93d781f 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -2,7 +2,7 @@ use crate::{ blockstore::Blockstore, blockstore_processor::{ self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions, - TransactionStatusSender, + ReplayVotesSender, TransactionStatusSender, }, entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, @@ -36,6 +36,7 @@ pub fn load( snapshot_config: Option<&SnapshotConfig>, process_options: ProcessOptions, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> LoadResult { if let Some(snapshot_config) = snapshot_config.as_ref() { info!( @@ -89,6 +90,7 @@ pub fn load( &process_options, &VerifyRecyclers::default(), transaction_status_sender, + replay_votes_sender, ), Some(deserialized_snapshot_hash), ); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index d8e4b205f0..972dcc1ddd 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -6,7 +6,7 @@ use crate::{ entry::{create_ticks, Entry, EntrySlice, EntryVerificationStatus, VerifyRecyclers}, leader_schedule_cache::LeaderScheduleCache, }; -use crossbeam_channel::Sender; +use crossbeam_channel::{Receiver, Sender}; use itertools::Itertools; use log::*; use rand::{seq::SliceRandom, thread_rng}; @@ -29,6 +29,7 @@ use solana_sdk::{ timing::duration_as_ms, transaction::{Result, Transaction, TransactionError}, }; +use solana_vote_program::{vote_state::Vote, vote_transaction}; use std::{ cell::RefCell, collections::HashMap, @@ -42,6 +43,10 @@ use thiserror::Error; pub type BlockstoreProcessorResult = result::Result<(BankForks, LeaderScheduleCache), BlockstoreProcessorError>; +pub type ReplayedVote = (Pubkey, Vote, Option); +pub type ReplayVotesSender = Sender; +pub type ReplayVotesReceiver = Receiver; + thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .thread_name(|ix| format!("blockstore_processor_{}", ix)) @@ -93,6 +98,7 @@ fn execute_batch( batch: &TransactionBatch, bank: &Arc, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> Result<()> { let ( TransactionResults { @@ -106,6 +112,19 @@ fn execute_batch( transaction_status_sender.is_some(), ); + if let Some(replay_votes_sender) = replay_votes_sender { + for (transaction, (processing_result, _)) in + OrderedIterator::new(batch.transactions(), batch.iteration_order()) + .zip(&processing_results) + { + if processing_result.is_ok() { + if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) { + let _ = replay_votes_sender.send(parsed_vote); + } + } + } + } + if let Some(sender) = transaction_status_sender { send_transaction_status_batch( bank.clone(), @@ -126,6 +145,7 @@ fn execute_batches( batches: &[TransactionBatch], entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); let results: Vec> = PAR_THREAD_POOL.with(|thread_pool| { @@ -133,7 +153,7 @@ fn execute_batches( batches .into_par_iter() .map_with(transaction_status_sender, |sender, batch| { - let result = execute_batch(batch, bank, sender.clone()); + let result = execute_batch(batch, bank, sender.clone(), replay_votes_sender); if let Some(entry_callback) = entry_callback { entry_callback(bank); } @@ -156,8 +176,16 @@ pub fn process_entries( entries: &[Entry], randomize: bool, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> Result<()> { - process_entries_with_callback(bank, entries, randomize, None, transaction_status_sender) + process_entries_with_callback( + bank, + entries, + randomize, + None, + transaction_status_sender, + replay_votes_sender, + ) } fn process_entries_with_callback( @@ -166,6 +194,7 @@ fn process_entries_with_callback( randomize: bool, entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut batches = vec![]; @@ -182,6 +211,7 @@ fn process_entries_with_callback( &batches, entry_callback, transaction_status_sender.clone(), + replay_votes_sender, )?; batches.clear(); for hash in &tick_hashes { @@ -237,12 +267,19 @@ fn process_entries_with_callback( &batches, entry_callback, transaction_status_sender.clone(), + replay_votes_sender, )?; batches.clear(); } } } - execute_batches(bank, &batches, entry_callback, transaction_status_sender)?; + execute_batches( + bank, + &batches, + entry_callback, + transaction_status_sender, + replay_votes_sender, + )?; for hash in tick_hashes { bank.register_tick(&hash); } @@ -308,7 +345,15 @@ pub fn process_blockstore( info!("processing ledger for slot 0..."); let recyclers = VerifyRecyclers::default(); process_bank_0(&bank0, blockstore, &opts, &recyclers)?; - process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers, None) + process_blockstore_from_root( + genesis_config, + blockstore, + bank0, + &opts, + &recyclers, + None, + None, + ) } // Process blockstore from a known root bank @@ -319,6 +364,7 @@ pub fn process_blockstore_from_root( opts: &ProcessOptions, recyclers: &VerifyRecyclers, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> BlockstoreProcessorResult { info!("processing ledger from slot {}...", bank.slot()); let allocated = thread_mem_usage::Allocatedp::default(); @@ -384,6 +430,7 @@ pub fn process_blockstore_from_root( opts, recyclers, transaction_status_sender, + replay_votes_sender, )?; (initial_forks, leader_schedule_cache) } else { @@ -473,6 +520,7 @@ fn confirm_full_slot( recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> result::Result<(), BlockstoreProcessorError> { let mut timing = ConfirmationTiming::default(); let skip_verification = !opts.poh_verify; @@ -483,6 +531,7 @@ fn confirm_full_slot( progress, skip_verification, transaction_status_sender, + replay_votes_sender, opts.entry_callback.as_ref(), recyclers, )?; @@ -543,6 +592,7 @@ pub fn confirm_slot( progress: &mut ConfirmationProgress, skip_verification: bool, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, entry_callback: Option<&ProcessCallback>, recyclers: &VerifyRecyclers, ) -> result::Result<(), BlockstoreProcessorError> { @@ -610,6 +660,7 @@ pub fn confirm_slot( true, entry_callback, transaction_status_sender, + replay_votes_sender, ) .map_err(BlockstoreProcessorError::from); replay_elapsed.stop(); @@ -646,8 +697,16 @@ fn process_bank_0( ) -> result::Result<(), BlockstoreProcessorError> { assert_eq!(bank0.slot(), 0); let mut progress = ConfirmationProgress::new(bank0.last_blockhash()); - confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress, None) - .expect("processing for bank 0 must succeed"); + confirm_full_slot( + blockstore, + bank0, + opts, + recyclers, + &mut progress, + None, + None, + ) + .expect("processing for bank 0 must succeed"); bank0.freeze(); Ok(()) } @@ -720,6 +779,7 @@ fn load_frozen_forks( opts: &ProcessOptions, recyclers: &VerifyRecyclers, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> result::Result>, BlockstoreProcessorError> { let mut initial_forks = HashMap::new(); let mut last_status_report = Instant::now(); @@ -766,6 +826,7 @@ fn load_frozen_forks( recyclers, &mut progress, transaction_status_sender.clone(), + replay_votes_sender, ) .is_err() { @@ -816,10 +877,11 @@ fn process_single_slot( recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, transaction_status_sender: Option, + replay_votes_sender: Option<&ReplayVotesSender>, ) -> result::Result<(), BlockstoreProcessorError> { // Mark corrupt slots as dead so validators don't replay this slot and // see DuplicateSignature errors later in ReplayStage - confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender).map_err(|err| { + confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender, replay_votes_sender).map_err(|err| { let slot = bank.slot(); warn!("slot {} failed to verify: {}", slot, err); if blockstore.is_primary_access() { @@ -909,8 +971,12 @@ pub mod tests { create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo, }, }; + use crossbeam_channel::unbounded; use matches::assert_matches; use rand::{thread_rng, Rng}; + use solana_runtime::genesis_utils::{ + create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, + }; use solana_sdk::account::Account; use solana_sdk::{ epoch_schedule::EpochSchedule, @@ -921,7 +987,7 @@ pub mod tests { system_transaction, transaction::{Transaction, TransactionError}, }; - use std::sync::RwLock; + use std::{collections::BTreeSet, sync::RwLock}; #[test] fn test_process_blockstore_with_missing_hashes() { @@ -1593,7 +1659,7 @@ pub mod tests { ); // Now ensure the TX is accepted despite pointing to the ID of an empty entry. - process_entries(&bank, &slot_entries, true, None).unwrap(); + process_entries(&bank, &slot_entries, true, None, None).unwrap(); assert_eq!(bank.process_transaction(&tx), Ok(())); } @@ -1798,7 +1864,7 @@ pub mod tests { // ensure bank can process a tick assert_eq!(bank.tick_height(), 0); let tick = next_entry(&genesis_config.hash(), 1, vec![]); - assert_eq!(process_entries(&bank, &[tick], true, None), Ok(())); + assert_eq!(process_entries(&bank, &[tick], true, None, None), Ok(())); assert_eq!(bank.tick_height(), 1); } @@ -1831,7 +1897,7 @@ pub mod tests { ); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); assert_eq!( - process_entries(&bank, &[entry_1, entry_2], true, None), + process_entries(&bank, &[entry_1, entry_2], true, None, None), Ok(()) ); assert_eq!(bank.get_balance(&keypair1.pubkey()), 2); @@ -1891,7 +1957,8 @@ pub mod tests { &bank, &[entry_1_to_mint, entry_2_to_3_mint_to_1], false, - None + None, + None, ), Ok(()) ); @@ -1963,6 +2030,7 @@ pub mod tests { &[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], false, None, + None, ) .is_err()); @@ -2074,6 +2142,7 @@ pub mod tests { ], false, None, + None, ) .is_err()); @@ -2121,7 +2190,7 @@ pub mod tests { system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash()); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); assert_eq!( - process_entries(&bank, &[entry_1, entry_2], true, None), + process_entries(&bank, &[entry_1, entry_2], true, None, None), Ok(()) ); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); @@ -2181,7 +2250,7 @@ pub mod tests { next_entry_mut(&mut hash, 0, transactions) }) .collect(); - assert_eq!(process_entries(&bank, &entries, true, None), Ok(())); + assert_eq!(process_entries(&bank, &entries, true, None, None), Ok(())); } #[test] @@ -2241,7 +2310,7 @@ pub mod tests { // Transfer lamports to each other let entry = next_entry(&bank.last_blockhash(), 1, tx_vector); - assert_eq!(process_entries(&bank, &[entry], true, None), Ok(())); + assert_eq!(process_entries(&bank, &[entry], true, None, None), Ok(())); bank.squash(); // Even number keypair should have balance of 2 * initial_lamports and @@ -2299,7 +2368,7 @@ pub mod tests { system_transaction::transfer(&keypair1, &keypair4.pubkey(), 1, bank.last_blockhash()); let entry_2 = next_entry(&tick.hash, 1, vec![tx]); assert_eq!( - process_entries(&bank, &[entry_1, tick, entry_2.clone()], true, None), + process_entries(&bank, &[entry_1, tick, entry_2.clone()], true, None, None), Ok(()) ); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); @@ -2310,7 +2379,7 @@ pub mod tests { system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash()); let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]); assert_eq!( - process_entries(&bank, &[entry_3], true, None), + process_entries(&bank, &[entry_3], true, None, None), Err(TransactionError::AccountNotFound) ); } @@ -2390,7 +2459,7 @@ pub mod tests { ); assert_eq!( - process_entries(&bank, &[entry_1_to_mint], false, None), + process_entries(&bank, &[entry_1_to_mint], false, None, None), Err(TransactionError::AccountInUse) ); @@ -2450,6 +2519,7 @@ pub mod tests { &recyclers, &mut ConfirmationProgress::new(bank0.last_blockhash()), None, + None, ) .unwrap(); bank1.squash(); @@ -2462,6 +2532,7 @@ pub mod tests { &opts, &recyclers, None, + None, ) .unwrap(); @@ -2543,7 +2614,7 @@ pub mod tests { }) .collect(); info!("paying iteration {}", i); - process_entries(&bank, &entries, true, None).expect("paying failed"); + process_entries(&bank, &entries, true, None, None).expect("paying failed"); let entries: Vec<_> = (0..NUM_TRANSFERS) .step_by(NUM_TRANSFERS_PER_ENTRY) @@ -2566,7 +2637,7 @@ pub mod tests { .collect(); info!("refunding iteration {}", i); - process_entries(&bank, &entries, true, None).expect("refunding failed"); + process_entries(&bank, &entries, true, None, None).expect("refunding failed"); // advance to next block process_entries( @@ -2576,6 +2647,7 @@ pub mod tests { .collect::>(), true, None, + None, ) .expect("process ticks failed"); @@ -2618,7 +2690,7 @@ pub mod tests { let entry = next_entry(&new_blockhash, 1, vec![tx]); entries.push(entry); - process_entries_with_callback(&bank0, &entries, true, None, None).unwrap(); + process_entries_with_callback(&bank0, &entries, true, None, None, None).unwrap(); assert_eq!(bank0.get_balance(&keypair.pubkey()), 1) } @@ -2699,4 +2771,81 @@ pub mod tests { assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound); assert_eq!(signature, account_not_found_sig); } + + #[test] + fn test_replay_vote_sender() { + let validator_keypairs: Vec<_> = + (0..10).map(|_| ValidatorVoteKeypairs::new_rand()).collect(); + let GenesisConfigInfo { + genesis_config, + voting_keypair: _, + .. + } = create_genesis_config_with_vote_accounts( + 1_000_000_000, + &validator_keypairs, + vec![100; validator_keypairs.len()], + ); + let bank0 = Arc::new(Bank::new(&genesis_config)); + bank0.freeze(); + + let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::new_rand(), 1)); + + // The new blockhash is going to be the hash of the last tick in the block + let bank_1_blockhash = bank1.last_blockhash(); + + // Create an transaction that references the new blockhash, should still + // be able to find the blockhash if we process transactions all in the same + // batch + let mut expected_successful_voter_pubkeys = BTreeSet::new(); + let vote_txs: Vec<_> = validator_keypairs + .iter() + .enumerate() + .map(|(i, validator_keypairs)| { + if i % 3 == 0 { + // These votes are correct + expected_successful_voter_pubkeys + .insert(validator_keypairs.vote_keypair.pubkey()); + vote_transaction::new_vote_transaction( + vec![0], + bank0.hash(), + bank_1_blockhash, + &validator_keypairs.node_keypair, + &validator_keypairs.vote_keypair, + &validator_keypairs.vote_keypair, + None, + ) + } else if i % 3 == 1 { + // These have the wrong authorized voter + vote_transaction::new_vote_transaction( + vec![0], + bank0.hash(), + bank_1_blockhash, + &validator_keypairs.node_keypair, + &validator_keypairs.vote_keypair, + &Keypair::new(), + None, + ) + } else { + // These have an invalid vote for non-existent bank 2 + vote_transaction::new_vote_transaction( + vec![bank1.slot() + 1], + bank0.hash(), + bank_1_blockhash, + &validator_keypairs.node_keypair, + &validator_keypairs.vote_keypair, + &validator_keypairs.vote_keypair, + None, + ) + } + }) + .collect(); + let entry = next_entry(&bank_1_blockhash, 1, vote_txs); + let (replay_votes_sender, replay_votes_receiver) = unbounded(); + let _ = process_entries(&bank1, &[entry], true, None, Some(&replay_votes_sender)); + let successes: BTreeSet = replay_votes_receiver + .try_iter() + .map(|(vote_pubkey, _, _)| vote_pubkey) + .collect(); + assert_eq!(successes, expected_successful_voter_pubkeys); + } }