From 2f4a3ed190b652f6d337a7b9dfb79d1b63951c9c Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 17 Apr 2019 21:07:45 -0700 Subject: [PATCH] Use a separate channel to process votes in banking stage (#3861) - This will help expedite the vote processing on peer nodes --- core/benches/banking_stage.rs | 18 +++++++++++-- core/src/banking_stage.rs | 48 ++++++++++++++++++++++++++++++----- core/src/tpu.rs | 10 ++++++-- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index fdd9f95382..01b199a510 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -55,6 +55,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total); let (verified_sender, verified_receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); let bank = Arc::new(Bank::new(&genesis_block)); let dummy = system_transaction::transfer( &mint_keypair, @@ -116,7 +117,12 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { create_test_recorder(&bank, &blocktree); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); - let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + let _banking_stage = BankingStage::new( + &cluster_info, + &poh_recorder, + verified_receiver, + vote_receiver, + ); poh_recorder.lock().unwrap().set_bank(&bank); let mut id = genesis_block.hash(); @@ -138,6 +144,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { start += half_len; start %= verified.len(); }); + drop(vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } @@ -155,6 +162,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total); let (verified_sender, verified_receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); let bank = Arc::new(Bank::new(&genesis_block)); let dummy = system_transaction::transfer( &mint_keypair, @@ -232,7 +240,12 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { create_test_recorder(&bank, &blocktree); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); - let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + let _banking_stage = BankingStage::new( + &cluster_info, + &poh_recorder, + verified_receiver, + vote_receiver, + ); poh_recorder.lock().unwrap().set_bank(&bank); let mut id = genesis_block.hash(); @@ -254,6 +267,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { start += half_len; start %= verified.len(); }); + drop(vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 57981daeb2..389c16c4e0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -21,6 +21,7 @@ use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::{self, Transaction, TransactionError}; +use std::cmp; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; @@ -54,12 +55,14 @@ impl BankingStage { cluster_info: &Arc>, poh_recorder: &Arc>, verified_receiver: Receiver, + verified_vote_receiver: Receiver, ) -> Self { Self::new_num_threads( cluster_info, poh_recorder, verified_receiver, - Self::num_threads(), + verified_vote_receiver, + cmp::min(2, Self::num_threads()), ) } @@ -67,9 +70,11 @@ impl BankingStage { cluster_info: &Arc>, poh_recorder: &Arc>, verified_receiver: Receiver, + verified_vote_receiver: Receiver, num_threads: u32, ) -> Self { let verified_receiver = Arc::new(Mutex::new(verified_receiver)); + let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver)); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. @@ -78,8 +83,13 @@ impl BankingStage { // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) - .map(|_| { - let verified_receiver = verified_receiver.clone(); + .map(|i| { + let verified_receiver = if i < num_threads - 1 { + verified_receiver.clone() + } else { + verified_vote_receiver.clone() + }; + let poh_recorder = poh_recorder.clone(); let cluster_info = cluster_info.clone(); let exit = exit.clone(); @@ -629,6 +639,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -638,8 +649,14 @@ mod tests { create_test_recorder(&bank, &blocktree); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); - let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + let banking_stage = BankingStage::new( + &cluster_info, + &poh_recorder, + verified_receiver, + vote_receiver, + ); drop(verified_sender); + drop(vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); poh_service.join().unwrap(); @@ -655,6 +672,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -664,10 +682,16 @@ mod tests { create_test_recorder(&bank, &blocktree); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); - let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + let banking_stage = BankingStage::new( + &cluster_info, + &poh_recorder, + verified_receiver, + vote_receiver, + ); trace!("sending bank"); sleep(Duration::from_millis(600)); drop(verified_sender); + drop(vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); drop(poh_recorder); @@ -693,6 +717,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); + let (vote_sender, vote_receiver) = channel(); let ledger_path = get_tmp_ledger_path!(); { let blocktree = Arc::new( @@ -702,7 +727,12 @@ mod tests { create_test_recorder(&bank, &blocktree); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); - let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + let banking_stage = BankingStage::new( + &cluster_info, + &poh_recorder, + verified_receiver, + vote_receiver, + ); // fund another account so we can send 2 good transactions in a single batch. let keypair = Keypair::new(); @@ -739,6 +769,7 @@ mod tests { .unwrap(); drop(verified_sender); + drop(vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); drop(poh_recorder); @@ -816,6 +847,7 @@ mod tests { .send(vec![(packets[0].clone(), vec![1u8])]) .unwrap(); + let (vote_sender, vote_receiver) = channel(); let ledger_path = get_tmp_ledger_path!(); { let entry_receiver = { @@ -834,7 +866,8 @@ mod tests { &cluster_info, &poh_recorder, verified_receiver, - 1, + vote_receiver, + 2, ); // wait for banking_stage to eat the packets @@ -846,6 +879,7 @@ mod tests { entry_receiver }; drop(verified_sender); + drop(vote_sender); // consume the entire entry_receiver, feed it into a new bank // check that the balance is what we expect. diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 3962f83475..92a3749ea4 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -55,15 +55,21 @@ impl Tpu { let sigverify_stage = SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone()); + let (verified_vote_sender, verified_vote_receiver) = channel(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( &exit, cluster_info.clone(), sigverify_disabled, - verified_sender, + verified_vote_sender, &poh_recorder, ); - let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver); + let banking_stage = BankingStage::new( + &cluster_info, + poh_recorder, + verified_receiver, + verified_vote_receiver, + ); let broadcast_stage = BroadcastStage::new( broadcast_socket,