diff --git a/Cargo.lock b/Cargo.lock index 26e4d352bc..51ebc36491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4134,6 +4134,7 @@ dependencies = [ "bv", "byteorder", "bzip2", + "crossbeam-channel", "dir-diff", "flate2", "fnv", diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 66a6d8146b..9da85ae9ef 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -167,6 +167,7 @@ fn main() { let (verified_sender, verified_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank0 = Bank::new(&genesis_config); let mut bank_forks = BankForks::new(bank0); let mut bank = bank_forks.working_bank(); @@ -224,6 +225,7 @@ fn main() { verified_receiver, vote_receiver, None, + replay_vote_sender, ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 62c46ed6dd..0004ce92af 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -73,6 +73,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let batch_len = batch.packets.len(); packets.push((batch, vec![0usize; batch_len])); } + let (s, _r) = unbounded(); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. bencher.iter(move || { @@ -82,6 +83,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &mut packets, 10_000, None, + &s, ); }); @@ -190,12 +192,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { create_test_recorder(&bank, &blockstore, None); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); + let (s, _r) = unbounded(); let _banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, vote_receiver, None, + s, ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1df5af4fba..bf24b57a4e 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -24,7 +24,9 @@ use solana_perf::{ use solana_runtime::{ accounts_db::ErrorCounters, bank::{Bank, TransactionBalancesSet, TransactionProcessResult}, + bank_utils, transaction_batch::TransactionBatch, + vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ clock::{ @@ -81,6 +83,7 @@ impl BankingStage { verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, + gossip_vote_sender: ReplayVoteSender, ) -> Self { Self::new_num_threads( cluster_info, @@ -89,6 +92,7 @@ impl BankingStage { verified_vote_receiver, Self::num_threads(), transaction_status_sender, + gossip_vote_sender, ) } @@ -99,6 +103,7 @@ impl BankingStage { verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, transaction_status_sender: Option, + gossip_vote_sender: ReplayVoteSender, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. @@ -119,6 +124,7 @@ impl BankingStage { let cluster_info = cluster_info.clone(); let mut recv_start = Instant::now(); let transaction_status_sender = transaction_status_sender.clone(); + let gossip_vote_sender = gossip_vote_sender.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -132,7 +138,8 @@ impl BankingStage { enable_forwarding, i, batch_limit, - transaction_status_sender.clone(), + transaction_status_sender, + gossip_vote_sender, ); }) .unwrap() @@ -168,6 +175,7 @@ impl BankingStage { buffered_packets: &mut Vec, batch_limit: usize, transaction_status_sender: Option, + gossip_vote_sender: &ReplayVoteSender, ) -> UnprocessedPackets { let mut unprocessed_packets = vec![]; let mut rebuffered_packets = 0; @@ -199,6 +207,7 @@ impl BankingStage { &msgs, unprocessed_indexes.to_owned(), transaction_status_sender.clone(), + gossip_vote_sender, ); new_tx_count += processed; @@ -283,6 +292,7 @@ impl BankingStage { ) } + #[allow(clippy::too_many_arguments)] fn process_buffered_packets( my_pubkey: &Pubkey, socket: &std::net::UdpSocket, @@ -292,6 +302,7 @@ impl BankingStage { enable_forwarding: bool, batch_limit: usize, transaction_status_sender: Option, + gossip_vote_sender: &ReplayVoteSender, ) -> BufferedPacketsDecision { let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { let poh = poh_recorder.lock().unwrap(); @@ -319,6 +330,7 @@ impl BankingStage { buffered_packets, batch_limit, transaction_status_sender, + gossip_vote_sender, ); buffered_packets.append(&mut unprocessed); } @@ -352,6 +364,7 @@ impl BankingStage { decision } + #[allow(clippy::too_many_arguments)] pub fn process_loop( my_pubkey: Pubkey, verified_receiver: &CrossbeamReceiver>, @@ -362,6 +375,7 @@ impl BankingStage { id: u32, batch_limit: usize, transaction_status_sender: Option, + gossip_vote_sender: ReplayVoteSender, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; @@ -376,6 +390,7 @@ impl BankingStage { enable_forwarding, batch_limit, transaction_status_sender.clone(), + &gossip_vote_sender, ); if decision == BufferedPacketsDecision::Hold { // If we are waiting on a new bank, @@ -403,6 +418,7 @@ impl BankingStage { id, batch_limit, transaction_status_sender.clone(), + &gossip_vote_sender, ) { Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -501,6 +517,7 @@ impl BankingStage { poh: &Arc>, batch: &TransactionBatch, transaction_status_sender: Option, + gossip_vote_sender: &ReplayVoteSender, ) -> (Result, Vec) { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce @@ -533,24 +550,23 @@ impl BankingStage { let num_to_commit = num_to_commit.unwrap(); if num_to_commit != 0 { - let transaction_statuses = bank - .commit_transactions( - txs, - None, - &mut loaded_accounts, - &results, - tx_count, - signature_count, - ) - .processing_results; + let tx_results = bank.commit_transactions( + txs, + None, + &mut loaded_accounts, + &results, + tx_count, + signature_count, + ); + bank_utils::find_and_send_votes(txs, &tx_results, Some(gossip_vote_sender)); if let Some(sender) = transaction_status_sender { let post_balances = bank.collect_balances(batch); send_transaction_status_batch( bank.clone(), batch.transactions(), batch.iteration_order_vec(), - transaction_statuses, + tx_results.processing_results, TransactionBalancesSet::new(pre_balances, post_balances), sender, ); @@ -578,6 +594,7 @@ impl BankingStage { poh: &Arc>, chunk_offset: usize, transaction_status_sender: Option, + gossip_vote_sender: &ReplayVoteSender, ) -> (Result, Vec) { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -590,6 +607,7 @@ impl BankingStage { poh, &batch, transaction_status_sender, + gossip_vote_sender, ); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); @@ -618,6 +636,7 @@ impl BankingStage { transactions: &[Transaction], poh: &Arc>, transaction_status_sender: Option, + gossip_vote_sender: &ReplayVoteSender, ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; @@ -633,6 +652,7 @@ impl BankingStage { poh, chunk_start, transaction_status_sender.clone(), + gossip_vote_sender, ); trace!("process_transactions result: {:?}", result); @@ -764,6 +784,7 @@ impl BankingStage { msgs: &Packets, packet_indexes: Vec, transaction_status_sender: Option, + gossip_vote_sender: &ReplayVoteSender, ) -> (usize, usize, Vec) { let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(msgs, &packet_indexes); @@ -775,8 +796,13 @@ impl BankingStage { let tx_len = transactions.len(); - let (processed, unprocessed_tx_indexes) = - Self::process_transactions(bank, &transactions, poh, transaction_status_sender); + let (processed, unprocessed_tx_indexes) = Self::process_transactions( + bank, + &transactions, + poh, + transaction_status_sender, + gossip_vote_sender, + ); let unprocessed_tx_count = unprocessed_tx_indexes.len(); @@ -846,6 +872,7 @@ impl BankingStage { .collect() } + #[allow(clippy::too_many_arguments)] /// Process the incoming packets pub fn process_packets( my_pubkey: &Pubkey, @@ -856,6 +883,7 @@ impl BankingStage { id: u32, batch_limit: usize, transaction_status_sender: Option, + gossip_vote_sender: &ReplayVoteSender, ) -> Result { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -898,6 +926,7 @@ impl BankingStage { &msgs, packet_indexes, transaction_status_sender.clone(), + gossip_vote_sender, ); new_tx_count += processed; @@ -1045,6 +1074,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_config)); let (verified_sender, verified_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -1061,6 +1091,7 @@ mod tests { verified_receiver, vote_receiver, None, + gossip_vote_sender, ); drop(verified_sender); drop(vote_sender); @@ -1095,12 +1126,15 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, vote_receiver, None, + gossip_vote_sender, ); trace!("sending bank"); drop(verified_sender); @@ -1158,12 +1192,15 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, vote_receiver, None, + gossip_vote_sender, ); // fund another account so we can send 2 good transactions in a single batch. @@ -1284,6 +1321,8 @@ mod tests { let (vote_sender, vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path!(); { + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let entry_receiver = { // start a banking_stage to eat verified receiver let bank = Arc::new(Bank::new(&genesis_config)); @@ -1306,6 +1345,7 @@ mod tests { vote_receiver, 2, None, + gossip_vote_sender, ); // wait for banking_stage to eat the packets @@ -1683,6 +1723,7 @@ mod tests { let poh_recorder = Arc::new(Mutex::new(poh_recorder)); poh_recorder.lock().unwrap().set_working_bank(working_bank); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); BankingStage::process_and_record_transactions( &bank, @@ -1690,6 +1731,7 @@ mod tests { &poh_recorder, 0, None, + &gossip_vote_sender, ) .0 .unwrap(); @@ -1726,6 +1768,7 @@ mod tests { &poh_recorder, 0, None, + &gossip_vote_sender, ) .0, Err(PohRecorderError::MaxHeightReached) @@ -1777,12 +1820,15 @@ mod tests { poh_recorder.lock().unwrap().set_working_bank(working_bank); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let (result, unprocessed) = BankingStage::process_and_record_transactions( &bank, &transactions, &poh_recorder, 0, None, + &gossip_vote_sender, ); assert!(result.is_ok()); @@ -1866,8 +1912,16 @@ mod tests { // record let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let (processed_transactions_count, mut retryable_txs) = - BankingStage::process_transactions(&bank, &transactions, &poh_recorder, None); + BankingStage::process_transactions( + &bank, + &transactions, + &poh_recorder, + None, + &gossip_vote_sender, + ); assert_eq!(processed_transactions_count, 0,); @@ -1944,12 +1998,15 @@ mod tests { &Arc::new(AtomicBool::new(false)), ); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let _ = BankingStage::process_and_record_transactions( &bank, &transactions, &poh_recorder, 0, Some(transaction_status_sender), + &gossip_vote_sender, ); transaction_status_service.join().unwrap(); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 81fcee1973..d0e4c0fbfd 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -15,10 +15,7 @@ use crossbeam_channel::{ }; use itertools::izip; use log::*; -use solana_ledger::{ - blockstore::Blockstore, - blockstore_processor::{ReplayVotesReceiver, ReplayedVote}, -}; +use solana_ledger::blockstore::Blockstore; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{self, Packets}; use solana_runtime::{ @@ -26,6 +23,7 @@ use solana_runtime::{ bank_forks::BankForks, epoch_stakes::{EpochAuthorizedVoters, EpochStakes}, stakes::Stakes, + vote_sender_types::{ReplayVoteReceiver, ReplayedVote}, }; use solana_sdk::{ clock::{Epoch, Slot}, @@ -248,7 +246,7 @@ impl ClusterInfoVoteListener { bank_forks: Arc>, subscriptions: Arc, verified_vote_sender: VerifiedVoteSender, - replay_votes_receiver: ReplayVotesReceiver, + replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, ) -> Self { let exit_ = exit.clone(); @@ -420,7 +418,7 @@ impl ClusterInfoVoteListener { bank_forks: Arc>, subscriptions: Arc, verified_vote_sender: VerifiedVoteSender, - replay_votes_receiver: ReplayVotesReceiver, + replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, ) -> Result<()> { let mut optimistic_confirmation_verifier = @@ -478,7 +476,7 @@ impl ClusterInfoVoteListener { root_bank: &Bank, subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, - replay_votes_receiver: &ReplayVotesReceiver, + replay_votes_receiver: &ReplayVoteReceiver, ) -> Result> { Self::get_and_process_votes( gossip_vote_txs_receiver, @@ -496,7 +494,7 @@ impl ClusterInfoVoteListener { root_bank: &Bank, subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, - replay_votes_receiver: &ReplayVotesReceiver, + replay_votes_receiver: &ReplayVoteReceiver, ) -> Result> { let mut sel = Select::new(); sel.recv(gossip_vote_txs_receiver); @@ -772,12 +770,12 @@ impl ClusterInfoVoteListener { #[cfg(test)] mod tests { use super::*; - use solana_ledger::blockstore_processor::ReplayVotesSender; use solana_perf::packet; use solana_runtime::{ bank::Bank, commitment::BlockCommitmentCache, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, + vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ hash::Hash, @@ -1040,7 +1038,7 @@ mod tests { validator_voting_keypairs: &[ValidatorVoteKeypairs], switch_proof_hash: Option, votes_sender: &VerifiedVoteTransactionsSender, - replay_votes_sender: &ReplayVotesSender, + replay_votes_sender: &ReplayVoteSender, ) { validator_voting_keypairs.iter().for_each(|keypairs| { let node_keypair = &keypairs.node_keypair; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c87c66e0ea..733b329fc1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -21,9 +21,7 @@ use crate::{ use solana_ledger::{ block_error::BlockError, blockstore::Blockstore, - blockstore_processor::{ - self, BlockstoreProcessorError, ReplayVotesSender, TransactionStatusSender, - }, + blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, }; @@ -31,7 +29,7 @@ use solana_measure::{measure::Measure, thread_mem_usage}; use solana_metrics::inc_new_counter_info; use solana_runtime::{ bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache, - snapshot_package::AccountsPackageSender, + snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, @@ -223,7 +221,7 @@ impl ReplayStage { cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, - replay_votes_sender: ReplayVotesSender, + replay_vote_sender: ReplayVoteSender, ) -> Self { let ReplayStageConfig { my_pubkey, @@ -344,7 +342,7 @@ impl ReplayStage { &verify_recyclers, &mut heaviest_subtree_fork_choice, &subscriptions, - &replay_votes_sender, + &replay_vote_sender, ); replay_active_banks_time.stop(); Self::report_memory(&allocated, "replay_active_banks", start); @@ -940,7 +938,7 @@ impl ReplayStage { blockstore: &Blockstore, bank_progress: &mut ForkProgress, transaction_status_sender: Option, - replay_votes_sender: &ReplayVotesSender, + replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, ) -> result::Result { let tx_count_before = bank_progress.replay_progress.num_txs; @@ -951,7 +949,7 @@ impl ReplayStage { &mut bank_progress.replay_progress, false, transaction_status_sender, - Some(replay_votes_sender), + Some(replay_vote_sender), None, verify_recyclers, ); @@ -1214,7 +1212,7 @@ impl ReplayStage { verify_recyclers: &VerifyRecyclers, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, subscriptions: &Arc, - replay_votes_sender: &ReplayVotesSender, + replay_vote_sender: &ReplayVoteSender, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1260,7 +1258,7 @@ impl ReplayStage { &blockstore, bank_progress, transaction_status_sender.clone(), - replay_votes_sender, + replay_vote_sender, verify_recyclers, ); match replay_result { @@ -2417,7 +2415,7 @@ pub(crate) mod tests { F: Fn(&Keypair, Arc) -> Vec, { let ledger_path = get_tmp_ledger_path!(); - let (replay_votes_sender, _replay_votes_receiver) = unbounded(); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let res = { let blockstore = Arc::new( Blockstore::open(&ledger_path) @@ -2442,8 +2440,8 @@ pub(crate) mod tests { &blockstore, &mut bank0_progress, None, - &replay_votes_sender, - &VerifyRecyclers::default(), + &replay_vote_sender, + &&VerifyRecyclers::default(), ); // Check that the erroring bank was marked as dead in the progress map @@ -2606,7 +2604,7 @@ pub(crate) mod tests { blockstore.set_roots(&[slot]).unwrap(); let (transaction_status_sender, transaction_status_receiver) = unbounded(); - let (replay_votes_sender, _replay_votes_receiver) = unbounded(); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let transaction_status_service = TransactionStatusService::new( transaction_status_receiver, blockstore, @@ -2620,7 +2618,7 @@ pub(crate) mod tests { &entries, true, Some(transaction_status_sender), - Some(&replay_votes_sender), + Some(&replay_vote_sender), ); transaction_status_service.join().unwrap(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 72f2901f44..60c535367b 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -13,11 +13,11 @@ use crate::{ sigverify_stage::SigVerifyStage, }; use crossbeam_channel::unbounded; -use solana_ledger::{ - blockstore::Blockstore, - blockstore_processor::{ReplayVotesReceiver, TransactionStatusSender}, +use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}; +use solana_runtime::{ + bank_forks::BankForks, + vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, }; -use solana_runtime::bank_forks::BankForks; use std::{ net::UdpSocket, sync::{ @@ -55,7 +55,8 @@ impl Tpu { vote_tracker: Arc, bank_forks: Arc>, verified_vote_sender: VerifiedVoteSender, - replay_votes_receiver: ReplayVotesReceiver, + replay_vote_receiver: ReplayVoteReceiver, + replay_vote_sender: ReplayVoteSender, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -82,7 +83,7 @@ impl Tpu { bank_forks, subscriptions.clone(), verified_vote_sender, - replay_votes_receiver, + replay_vote_receiver, blockstore.clone(), ); @@ -92,6 +93,7 @@ impl Tpu { verified_receiver, verified_vote_packets_receiver, transaction_status_sender, + replay_vote_sender, ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/core/src/transaction_status_service.rs b/core/src/transaction_status_service.rs index 926adad150..59d9540700 100644 --- a/core/src/transaction_status_service.rs +++ b/core/src/transaction_status_service.rs @@ -57,7 +57,7 @@ impl TransactionStatusService { } = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?; let slot = bank.slot(); - for (((transaction, (status, hash_age_kind)), pre_balances), post_balances) in + for ((((_, transaction), (status, hash_age_kind)), pre_balances), post_balances) in OrderedIterator::new(&transactions, iteration_order.as_deref()) .zip(statuses) .zip(balances.pre_balances) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 3531f4ab21..ca04e302ee 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -21,12 +21,12 @@ use crate::{ use crossbeam_channel::unbounded; use solana_ledger::{ blockstore::{Blockstore, CompletedSlotsReceiver}, - blockstore_processor::{ReplayVotesSender, TransactionStatusSender}, + blockstore_processor::TransactionStatusSender, leader_schedule_cache::LeaderScheduleCache, }; use solana_runtime::{ bank_forks::BankForks, commitment::BlockCommitmentCache, - snapshot_package::AccountsPackageSender, + snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ pubkey::Pubkey, @@ -98,7 +98,7 @@ impl Tvu { vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, verified_vote_receiver: VerifiedVoteReceiver, - replay_votes_sender: ReplayVotesSender, + replay_vote_sender: ReplayVoteSender, tvu_config: TvuConfig, ) -> Self { let keypair: Arc = cluster_info.keypair.clone(); @@ -199,7 +199,7 @@ impl Tvu { cluster_slots, retransmit_slots_sender, duplicate_slots_reset_receiver, - replay_votes_sender, + replay_vote_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -285,7 +285,7 @@ pub mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded(); - let (replay_votes_sender, _replay_votes_receiver) = unbounded(); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let tvu = Tvu::new( &vote_keypair.pubkey(), @@ -319,7 +319,7 @@ pub mod tests { Arc::new(VoteTracker::new(&bank)), retransmit_slots_sender, verified_vote_receiver, - replay_votes_sender, + replay_vote_sender, TvuConfig::default(), ); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index 41e60fcddc..a10abae721 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -28,7 +28,7 @@ use solana_ledger::{ bank_forks_utils, blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, blockstore_db::BlockstoreRecoveryMode, - blockstore_processor::{self, ReplayVotesSender, TransactionStatusSender}, + blockstore_processor::{self, TransactionStatusSender}, create_new_tmp_ledger, leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, @@ -224,7 +224,7 @@ impl Validator { validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); - let (replay_votes_sender, replay_votes_receiver) = unbounded(); + let (replay_vote_sender, replay_vote_receiver) = unbounded(); let ( genesis_config, bank_forks, @@ -239,7 +239,7 @@ impl Validator { rewards_recorder_sender, rewards_recorder_service, }, - ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit, &replay_votes_sender); + ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit); let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank = bank_forks.working_bank(); @@ -465,7 +465,7 @@ impl Validator { vote_tracker.clone(), retransmit_slots_sender, verified_vote_receiver, - replay_votes_sender, + replay_vote_sender.clone(), TvuConfig { max_ledger_shreds: config.max_ledger_shreds, halt_on_trusted_validators_accounts_hash_mismatch: config @@ -493,7 +493,8 @@ impl Validator { vote_tracker, bank_forks, verified_vote_sender, - replay_votes_receiver, + replay_vote_receiver, + replay_vote_sender, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); @@ -587,7 +588,6 @@ fn new_banks_from_ledger( ledger_path: &Path, poh_verify: bool, exit: &Arc, - replay_votes_sender: &ReplayVotesSender, ) -> ( GenesisConfig, BankForks, @@ -649,7 +649,6 @@ fn new_banks_from_ledger( transaction_history_services .transaction_status_sender .clone(), - Some(&replay_votes_sender), ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index b781ee777f..09163db222 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -688,7 +688,6 @@ fn load_bank_forks( snapshot_config.as_ref(), process_options, None, - None, ) } diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 96f93d781f..4002b7898d 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -2,7 +2,7 @@ use crate::{ blockstore::Blockstore, blockstore_processor::{ self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions, - ReplayVotesSender, TransactionStatusSender, + TransactionStatusSender, }, entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, @@ -36,7 +36,6 @@ pub fn load( snapshot_config: Option<&SnapshotConfig>, process_options: ProcessOptions, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, ) -> LoadResult { if let Some(snapshot_config) = snapshot_config.as_ref() { info!( @@ -90,7 +89,6 @@ pub fn load( &process_options, &VerifyRecyclers::default(), transaction_status_sender, - replay_votes_sender, ), Some(deserialized_snapshot_hash), ); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index a3519a6be6..a482adfa81 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -6,7 +6,7 @@ use crate::{ entry::{create_ticks, Entry, EntrySlice, EntryVerificationStatus, VerifyRecyclers}, leader_schedule_cache::LeaderScheduleCache, }; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Sender; use itertools::Itertools; use log::*; use rand::{seq::SliceRandom, thread_rng}; @@ -17,8 +17,10 @@ use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{ bank::{Bank, TransactionBalancesSet, TransactionProcessResult, TransactionResults}, bank_forks::BankForks, + bank_utils, transaction_batch::TransactionBatch, transaction_utils::OrderedIterator, + vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ clock::{Slot, MAX_PROCESSING_AGE}, @@ -29,7 +31,6 @@ use solana_sdk::{ timing::duration_as_ms, transaction::{Result, Transaction, TransactionError}, }; -use solana_vote_program::{vote_state::Vote, vote_transaction}; use std::{ cell::RefCell, collections::HashMap, @@ -43,10 +44,6 @@ use thiserror::Error; pub type BlockstoreProcessorResult = result::Result<(BankForks, LeaderScheduleCache), BlockstoreProcessorError>; -pub type ReplayedVote = (Pubkey, Vote, Option); -pub type ReplayVotesSender = Sender; -pub type ReplayVotesReceiver = Receiver; - thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .thread_name(|ix| format!("blockstore_processor_{}", ix)) @@ -69,7 +66,7 @@ fn get_first_error( fee_collection_results: Vec>, ) -> Option<(Result<()>, Signature)> { let mut first_err = None; - for (result, transaction) in fee_collection_results.iter().zip(OrderedIterator::new( + for (result, (_, transaction)) in fee_collection_results.iter().zip(OrderedIterator::new( batch.transactions(), batch.iteration_order(), )) { @@ -98,32 +95,21 @@ fn execute_batch( batch: &TransactionBatch, bank: &Arc, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, + replay_vote_sender: Option<&ReplayVoteSender>, ) -> Result<()> { - let ( - TransactionResults { - fee_collection_results, - processing_results, - }, - balances, - ) = batch.bank().load_execute_and_commit_transactions( + let (tx_results, balances) = batch.bank().load_execute_and_commit_transactions( batch, MAX_PROCESSING_AGE, transaction_status_sender.is_some(), ); - if let Some(replay_votes_sender) = replay_votes_sender { - for (transaction, (processing_result, _)) in - OrderedIterator::new(batch.transactions(), batch.iteration_order()) - .zip(&processing_results) - { - if processing_result.is_ok() { - if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) { - let _ = replay_votes_sender.send(parsed_vote); - } - } - } - } + bank_utils::find_and_send_votes(batch.transactions(), &tx_results, replay_vote_sender); + + let TransactionResults { + fee_collection_results, + processing_results, + .. + } = tx_results; if let Some(sender) = transaction_status_sender { send_transaction_status_batch( @@ -145,7 +131,7 @@ fn execute_batches( batches: &[TransactionBatch], entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, + replay_vote_sender: Option<&ReplayVoteSender>, ) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); let results: Vec> = PAR_THREAD_POOL.with(|thread_pool| { @@ -153,7 +139,7 @@ fn execute_batches( batches .into_par_iter() .map_with(transaction_status_sender, |sender, batch| { - let result = execute_batch(batch, bank, sender.clone(), replay_votes_sender); + let result = execute_batch(batch, bank, sender.clone(), replay_vote_sender); if let Some(entry_callback) = entry_callback { entry_callback(bank); } @@ -176,7 +162,7 @@ pub fn process_entries( entries: &[Entry], randomize: bool, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, + replay_vote_sender: Option<&ReplayVoteSender>, ) -> Result<()> { process_entries_with_callback( bank, @@ -184,7 +170,7 @@ pub fn process_entries( randomize, None, transaction_status_sender, - replay_votes_sender, + replay_vote_sender, ) } @@ -194,7 +180,7 @@ fn process_entries_with_callback( randomize: bool, entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, + replay_vote_sender: Option<&ReplayVoteSender>, ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut batches = vec![]; @@ -211,7 +197,7 @@ fn process_entries_with_callback( &batches, entry_callback, transaction_status_sender.clone(), - replay_votes_sender, + replay_vote_sender, )?; batches.clear(); for hash in &tick_hashes { @@ -267,7 +253,7 @@ fn process_entries_with_callback( &batches, entry_callback, transaction_status_sender.clone(), - replay_votes_sender, + replay_vote_sender, )?; batches.clear(); } @@ -278,7 +264,7 @@ fn process_entries_with_callback( &batches, entry_callback, transaction_status_sender, - replay_votes_sender, + replay_vote_sender, )?; for hash in tick_hashes { bank.register_tick(&hash); @@ -345,15 +331,7 @@ pub fn process_blockstore( info!("processing ledger for slot 0..."); let recyclers = VerifyRecyclers::default(); process_bank_0(&bank0, blockstore, &opts, &recyclers)?; - process_blockstore_from_root( - genesis_config, - blockstore, - bank0, - &opts, - &recyclers, - None, - None, - ) + process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers, None) } // Process blockstore from a known root bank @@ -364,7 +342,6 @@ pub fn process_blockstore_from_root( opts: &ProcessOptions, recyclers: &VerifyRecyclers, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, ) -> BlockstoreProcessorResult { info!("processing ledger from slot {}...", bank.slot()); let allocated = thread_mem_usage::Allocatedp::default(); @@ -430,7 +407,6 @@ pub fn process_blockstore_from_root( opts, recyclers, transaction_status_sender, - replay_votes_sender, )?; (initial_forks, leader_schedule_cache) } else { @@ -520,7 +496,7 @@ fn confirm_full_slot( recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, + replay_vote_sender: Option<&ReplayVoteSender>, ) -> result::Result<(), BlockstoreProcessorError> { let mut timing = ConfirmationTiming::default(); let skip_verification = !opts.poh_verify; @@ -531,7 +507,7 @@ fn confirm_full_slot( progress, skip_verification, transaction_status_sender, - replay_votes_sender, + replay_vote_sender, opts.entry_callback.as_ref(), recyclers, )?; @@ -592,7 +568,7 @@ pub fn confirm_slot( progress: &mut ConfirmationProgress, skip_verification: bool, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, + replay_vote_sender: Option<&ReplayVoteSender>, entry_callback: Option<&ProcessCallback>, recyclers: &VerifyRecyclers, ) -> result::Result<(), BlockstoreProcessorError> { @@ -660,7 +636,7 @@ pub fn confirm_slot( true, entry_callback, transaction_status_sender, - replay_votes_sender, + replay_vote_sender, ) .map_err(BlockstoreProcessorError::from); replay_elapsed.stop(); @@ -779,7 +755,6 @@ fn load_frozen_forks( opts: &ProcessOptions, recyclers: &VerifyRecyclers, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, ) -> result::Result>, BlockstoreProcessorError> { let mut initial_forks = HashMap::new(); let mut last_status_report = Instant::now(); @@ -819,6 +794,7 @@ fn load_frozen_forks( let initial_allocation = allocated.get(); let mut progress = ConfirmationProgress::new(last_entry_hash); + if process_single_slot( blockstore, &bank, @@ -826,7 +802,7 @@ fn load_frozen_forks( recyclers, &mut progress, transaction_status_sender.clone(), - replay_votes_sender, + None, ) .is_err() { @@ -877,11 +853,11 @@ fn process_single_slot( recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, transaction_status_sender: Option, - replay_votes_sender: Option<&ReplayVotesSender>, + replay_vote_sender: Option<&ReplayVoteSender>, ) -> result::Result<(), BlockstoreProcessorError> { // Mark corrupt slots as dead so validators don't replay this slot and // see DuplicateSignature errors later in ReplayStage - confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender, replay_votes_sender).map_err(|err| { + confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender, replay_vote_sender).map_err(|err| { let slot = bank.slot(); warn!("slot {} failed to verify: {}", slot, err); if blockstore.is_primary_access() { @@ -987,6 +963,7 @@ pub mod tests { system_transaction, transaction::{Transaction, TransactionError}, }; + use solana_vote_program::vote_transaction; use std::{collections::BTreeSet, sync::RwLock}; #[test] @@ -2532,7 +2509,6 @@ pub mod tests { &opts, &recyclers, None, - None, ) .unwrap(); @@ -2760,7 +2736,7 @@ pub mod tests { let ( TransactionResults { fee_collection_results, - processing_results: _, + .. }, _balances, ) = batch @@ -2840,9 +2816,9 @@ pub mod tests { }) .collect(); let entry = next_entry(&bank_1_blockhash, 1, vote_txs); - let (replay_votes_sender, replay_votes_receiver) = unbounded(); - let _ = process_entries(&bank1, &[entry], true, None, Some(&replay_votes_sender)); - let successes: BTreeSet = replay_votes_receiver + let (replay_vote_sender, replay_vote_receiver) = unbounded(); + let _ = process_entries(&bank1, &[entry], true, None, Some(&replay_vote_sender)); + let successes: BTreeSet = replay_vote_receiver .try_iter() .map(|(vote_pubkey, _, _)| vote_pubkey) .collect(); diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 72775d83af..4c99002d93 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -250,6 +250,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.7.3" @@ -1840,6 +1850,7 @@ dependencies = [ "bv", "byteorder 1.3.4", "bzip2", + "crossbeam-channel", "dir-diff", "flate2", "fnv", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 311b916005..29f4a819c5 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -13,6 +13,7 @@ bincode = "1.3.1" bv = { version = "0.11.1", features = ["serde"] } byteorder = "1.3.4" bzip2 = "0.3.3" +crossbeam-channel = "0.4" dir-diff = "0.3.2" flate2 = "1.0.14" fnv = "1.0.7" diff --git a/runtime/benches/transaction_utils.rs b/runtime/benches/transaction_utils.rs index ad5b1667e1..7c45718fa7 100644 --- a/runtime/benches/transaction_utils.rs +++ b/runtime/benches/transaction_utils.rs @@ -12,7 +12,7 @@ fn bench_ordered_iterator_with_order_shuffling(bencher: &mut Bencher) { bencher.iter(|| { let mut order: Vec = (0..100_usize).collect(); order.shuffle(&mut thread_rng()); - let _ordered_iterator_resp: Vec<&usize> = + let _ordered_iterator_resp: Vec<(usize, &usize)> = OrderedIterator::new(&vec, Some(&order)).collect(); }); } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 98f3948e47..f30668a3c0 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -281,7 +281,7 @@ impl Accounts { OrderedIterator::new(txs, txs_iteration_order) .zip(lock_results.into_iter()) .map(|etx| match etx { - (tx, (Ok(()), hash_age_kind)) => { + ((_, tx), (Ok(()), hash_age_kind)) => { let fee_calculator = match hash_age_kind.as_ref() { Some(HashAgeKind::DurableNonce(_, account)) => { nonce_utils::fee_calculator_of(account) @@ -612,7 +612,7 @@ impl Accounts { ) -> Vec> { use solana_sdk::sanitize::Sanitize; let keys: Vec> = OrderedIterator::new(txs, txs_iteration_order) - .map(|tx| { + .map(|(_, tx)| { tx.sanitize().map_err(TransactionError::from)?; if Self::has_duplicates(&tx.message.account_keys) { @@ -645,7 +645,7 @@ impl Accounts { OrderedIterator::new(txs, txs_iteration_order) .zip(results.iter()) - .for_each(|(tx, result)| self.unlock_account(tx, result, &mut account_locks)); + .for_each(|((_, tx), result)| self.unlock_account(tx, result, &mut account_locks)); } /// Store the accounts into the DB @@ -697,7 +697,7 @@ impl Accounts { fix_recent_blockhashes_sysvar_delay: bool, ) -> Vec<(&'a Pubkey, &'a Account)> { let mut accounts = Vec::with_capacity(loaded.len()); - for (i, ((raccs, _hash_age_kind), tx)) in loaded + for (i, ((raccs, _hash_age_kind), (_, tx))) in loaded .iter_mut() .zip(OrderedIterator::new(txs, txs_iteration_order)) .enumerate() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index ba904dfe4c..bec1b0d21f 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -175,11 +175,18 @@ pub type TransactionProcessResult = (Result<()>, Option); pub struct TransactionResults { pub fee_collection_results: Vec>, pub processing_results: Vec, + pub overwritten_vote_accounts: Vec, } pub struct TransactionBalancesSet { pub pre_balances: TransactionBalances, pub post_balances: TransactionBalances, } +pub struct OverwrittenVoteAccount { + pub account: Account, + pub transaction_index: usize, + pub transaction_result_index: usize, +} + impl TransactionBalancesSet { pub fn new(pre_balances: TransactionBalances, post_balances: TransactionBalances) -> Self { assert_eq!(pre_balances.len(), post_balances.len()); @@ -1276,7 +1283,7 @@ impl Bank { res: &[TransactionProcessResult], ) { let mut status_cache = self.src.status_cache.write().unwrap(); - for (i, tx) in OrderedIterator::new(txs, iteration_order).enumerate() { + for (i, (_, tx)) in OrderedIterator::new(txs, iteration_order).enumerate() { let (res, _hash_age_kind) = &res[i]; if Self::can_commit(res) && !tx.signatures.is_empty() { status_cache.insert( @@ -1422,7 +1429,7 @@ impl Bank { let hash_queue = self.blockhash_queue.read().unwrap(); OrderedIterator::new(txs, iteration_order) .zip(lock_results.into_iter()) - .map(|(tx, lock_res)| match lock_res { + .map(|((_, tx), lock_res)| match lock_res { Ok(()) => { let message = tx.message(); let hash_age = hash_queue.check_hash_age(&message.recent_blockhash, max_age); @@ -1452,7 +1459,7 @@ impl Bank { let rcache = self.src.status_cache.read().unwrap(); OrderedIterator::new(txs, iteration_order) .zip(lock_results.into_iter()) - .map(|(tx, lock_res)| { + .map(|((_, tx), lock_res)| { if tx.signatures.is_empty() { return lock_res; } @@ -1487,7 +1494,7 @@ impl Bank { ) -> Vec { OrderedIterator::new(txs, iteration_order) .zip(lock_results.into_iter()) - .map(|(tx, lock_res)| { + .map(|((_, tx), lock_res)| { if lock_res.0.is_ok() { if tx.message.instructions.len() == 1 { let instruction = &tx.message.instructions[0]; @@ -1579,7 +1586,8 @@ impl Bank { pub fn collect_balances(&self, batch: &TransactionBatch) -> TransactionBalances { let mut balances: TransactionBalances = vec![]; - for transaction in OrderedIterator::new(batch.transactions(), batch.iteration_order()) { + for (_, transaction) in OrderedIterator::new(batch.transactions(), batch.iteration_order()) + { let mut transaction_balances: Vec = vec![]; for account_key in transaction.message.account_keys.iter() { transaction_balances.push(self.get_balance(account_key)); @@ -1728,7 +1736,7 @@ impl Bank { let retryable_txs: Vec<_> = OrderedIterator::new(batch.lock_results(), batch.iteration_order()) .enumerate() - .filter_map(|(index, res)| match res { + .filter_map(|(index, (_, res))| match res { Err(TransactionError::AccountInUse) => { error_counters.account_in_use += 1; Some(index) @@ -1758,7 +1766,7 @@ impl Bank { let executed: Vec = loaded_accounts .iter_mut() .zip(OrderedIterator::new(txs, batch.iteration_order())) - .map(|(accs, tx)| match accs { + .map(|(accs, (_, tx))| match accs { (Err(e), hash_age_kind) => (Err(e.clone()), hash_age_kind.clone()), (Ok((accounts, loaders, _rents)), hash_age_kind) => { signature_count += u64::from(tx.message().header.num_required_signatures); @@ -1837,7 +1845,7 @@ impl Bank { let mut fees = 0; let results = OrderedIterator::new(txs, iteration_order) .zip(executed.iter()) - .map(|(tx, (res, hash_age_kind))| { + .map(|((_, tx), (res, hash_age_kind))| { let (fee_calculator, is_durable_nonce) = match hash_age_kind { Some(HashAgeKind::DurableNonce(_, account)) => { (nonce_utils::fee_calculator_of(account), true) @@ -1921,7 +1929,8 @@ impl Bank { ); self.collect_rent(executed, loaded_accounts); - self.update_cached_accounts(txs, iteration_order, executed, loaded_accounts); + let overwritten_vote_accounts = + self.update_cached_accounts(txs, iteration_order, executed, loaded_accounts); // once committed there is no way to unroll write_time.stop(); @@ -1929,9 +1938,11 @@ impl Bank { self.update_transaction_statuses(txs, iteration_order, &executed); let fee_collection_results = self.filter_program_errors_and_collect_fee(txs, iteration_order, executed); + TransactionResults { fee_collection_results, processing_results: executed.to_vec(), + overwritten_vote_accounts, } } @@ -2866,8 +2877,9 @@ impl Bank { iteration_order: Option<&[usize]>, res: &[TransactionProcessResult], loaded: &[(Result, Option)], - ) { - for (i, ((raccs, _load_hash_age_kind), tx)) in loaded + ) -> Vec { + let mut overwritten_vote_accounts = vec![]; + for (i, ((raccs, _load_hash_age_kind), (transaction_index, tx))) in loaded .iter() .zip(OrderedIterator::new(txs, iteration_order)) .enumerate() @@ -2887,10 +2899,20 @@ impl Bank { .filter(|(_key, account)| (Stakes::is_stake(account))) { if Stakes::is_stake(account) { - self.stakes.write().unwrap().store(pubkey, account); + if let Some(old_vote_account) = + self.stakes.write().unwrap().store(pubkey, account) + { + overwritten_vote_accounts.push(OverwrittenVoteAccount { + account: old_vote_account, + transaction_index, + transaction_result_index: i, + }); + } } } } + + overwritten_vote_accounts } /// current stake delegations for this bank diff --git a/runtime/src/bank_utils.rs b/runtime/src/bank_utils.rs index 15c766a523..e6009471ae 100644 --- a/runtime/src/bank_utils.rs +++ b/runtime/src/bank_utils.rs @@ -1,8 +1,10 @@ use crate::{ - bank::Bank, + bank::{Bank, TransactionResults}, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, + vote_sender_types::ReplayVoteSender, }; -use solana_sdk::{pubkey::Pubkey, signature::Signer}; +use solana_sdk::{pubkey::Pubkey, signature::Signer, transaction::Transaction}; +use solana_vote_program::vote_transaction; pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Bank, Vec) { // Create some voters at genesis @@ -23,3 +25,28 @@ pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Ban let bank = Bank::new(&genesis_config); (bank, vote_pubkeys) } + +pub fn find_and_send_votes( + txs: &[Transaction], + tx_results: &TransactionResults, + vote_sender: Option<&ReplayVoteSender>, +) { + let TransactionResults { + processing_results, + overwritten_vote_accounts, + .. + } = tx_results; + if let Some(vote_sender) = vote_sender { + for old_account in overwritten_vote_accounts { + assert!(processing_results[old_account.transaction_result_index] + .0 + .is_ok()); + let transaction = &txs[old_account.transaction_index]; + if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) { + if parsed_vote.1.slots.last().is_some() { + let _ = vote_sender.send(parsed_vote); + } + } + } + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 526ad7d4db..985f69aa5a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -29,6 +29,7 @@ pub mod status_cache; mod system_instruction_processor; pub mod transaction_batch; pub mod transaction_utils; +pub mod vote_sender_types; extern crate solana_config_program; extern crate solana_stake_program; diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index 8a731a3e7e..5714eb3b75 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -105,20 +105,18 @@ impl Stakes { && account.data.len() >= std::mem::size_of::() } - pub fn store(&mut self, pubkey: &Pubkey, account: &Account) { + pub fn store(&mut self, pubkey: &Pubkey, account: &Account) -> Option { if solana_vote_program::check_id(&account.owner) { - if account.lamports == 0 { - self.vote_accounts.remove(pubkey); - } else { - let old = self.vote_accounts.get(pubkey); - - let stake = old.map_or_else( + let old = self.vote_accounts.remove(pubkey); + if account.lamports != 0 { + let stake = old.as_ref().map_or_else( || self.calculate_stake(pubkey, self.epoch, Some(&self.stake_history)), |v| v.0, ); self.vote_accounts.insert(*pubkey, (stake, account.clone())); } + old.map(|(_, account)| account) } else if solana_stake_program::check_id(&account.owner) { // old_stake is stake lamports and voter_pubkey from the pre-store() version let old_stake = self.stake_delegations.get(pubkey).map(|delegation| { @@ -160,6 +158,9 @@ impl Stakes { } else if let Some(delegation) = delegation { self.stake_delegations.insert(*pubkey, delegation); } + None + } else { + None } } diff --git a/runtime/src/transaction_utils.rs b/runtime/src/transaction_utils.rs index 8068c6a622..994163972f 100644 --- a/runtime/src/transaction_utils.rs +++ b/runtime/src/transaction_utils.rs @@ -21,7 +21,7 @@ impl<'a, T> OrderedIterator<'a, T> { } impl<'a, T> Iterator for OrderedIterator<'a, T> { - type Item = &'a T; + type Item = (usize, &'a T); fn next(&mut self) -> Option { if self.current >= self.vec.len() { None @@ -33,7 +33,7 @@ impl<'a, T> Iterator for OrderedIterator<'a, T> { index = self.current; } self.current += 1; - Some(self.vec.index(index)) + Some((index, self.vec.index(index))) } } } @@ -42,17 +42,22 @@ impl<'a, T> Iterator for OrderedIterator<'a, T> { mod tests { use super::*; + type IteratorResponse<'a> = Vec<(((usize, &'a usize), &'a usize), usize)>; + #[test] fn test_ordered_iterator_custom_order() { let vec: Vec = vec![1, 2, 3, 4]; let custom_order: Vec = vec![3, 1, 0, 2]; - + let custom_order_ = custom_order.clone(); let ordered_iterator = OrderedIterator::new(&vec, Some(&custom_order)); let expected_response: Vec = vec![4, 2, 1, 3]; - let resp: Vec<(&usize, &usize)> = ordered_iterator + let resp: IteratorResponse = ordered_iterator .zip(expected_response.iter()) - .filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem) + .zip(custom_order_) + .filter(|(((index, actual_elem), expected_elem), expected_index)| { + *actual_elem == *expected_elem && index == expected_index + }) .collect(); assert_eq!(resp.len(), custom_order.len()); @@ -63,9 +68,12 @@ mod tests { let vec: Vec = vec![1, 2, 3, 4]; let ordered_iterator = OrderedIterator::new(&vec, None); - let resp: Vec<(&usize, &usize)> = ordered_iterator + let resp: IteratorResponse = ordered_iterator .zip(vec.iter()) - .filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem) + .zip(0..=4) + .filter(|(((index, actual_elem), expected_elem), expected_index)| { + *actual_elem == *expected_elem && index == expected_index + }) .collect(); assert_eq!(resp.len(), vec.len()); diff --git a/runtime/src/vote_sender_types.rs b/runtime/src/vote_sender_types.rs new file mode 100644 index 0000000000..6d81b1916c --- /dev/null +++ b/runtime/src/vote_sender_types.rs @@ -0,0 +1,7 @@ +use crossbeam_channel::{Receiver, Sender}; +use solana_sdk::{hash::Hash, pubkey::Pubkey}; +use solana_vote_program::vote_state::Vote; + +pub type ReplayedVote = (Pubkey, Vote, Option); +pub type ReplayVoteSender = Sender; +pub type ReplayVoteReceiver = Receiver;