diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 80294ad737..9111282ba8 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -321,9 +321,6 @@ fn main() { .. } = create_genesis_config(mint_total); - let (verified_sender, verified_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank0 = Bank::new_for_benches(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); @@ -410,6 +407,9 @@ fn main() { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank, &blockstore, None, Some(leader_schedule_cache)); + let (non_vote_sender, non_vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); let cluster_info = ClusterInfo::new( Node::new_localhost().info, Arc::new(Keypair::new()), @@ -424,9 +424,9 @@ fn main() { let banking_stage = BankingStage::new_num_threads( &cluster_info, &poh_recorder, - verified_receiver, + non_vote_receiver, tpu_vote_receiver, - vote_receiver, + gossip_vote_receiver, num_banking_threads, None, replay_vote_sender, @@ -461,7 +461,7 @@ fn main() { packet_batch_index, timestamp(), ); - verified_sender + non_vote_sender .send((vec![packet_batch.clone()], None)) .unwrap(); } @@ -574,9 +574,9 @@ fn main() { (1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64), ); - drop(verified_sender); + drop(non_vote_sender); drop(tpu_vote_sender); - drop(vote_sender); + drop(gossip_vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); debug!("waited for banking_stage"); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index cab828f623..baa30a63f0 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -197,9 +197,10 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { // during the benchmark genesis_config.ticks_per_slot = 10_000; - let (verified_sender, verified_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (vote_sender, vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let mut bank = Bank::new_for_benches(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench bank.ns_per_slot = u128::MAX; @@ -279,9 +280,9 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let _banking_stage = BankingStage::new( &cluster_info, &poh_recorder, - verified_receiver, + non_vote_receiver, tpu_vote_receiver, - vote_receiver, + gossip_vote_receiver, None, s, None, @@ -305,7 +306,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { tpu_vote_sender .send((vote_packets[start..start + chunk_len].to_vec(), None)) .unwrap(); - vote_sender + gossip_vote_sender .send((vote_packets[start..start + chunk_len].to_vec(), None)) .unwrap(); } @@ -320,7 +321,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { for xv in v { sent += xv.len(); } - verified_sender.send((v.to_vec(), None)).unwrap(); + non_vote_sender.send((v.to_vec(), None)).unwrap(); } check_txs(&signal_receiver2, txes / CHUNKS); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 05e92582c1..ea9fff76e7 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -385,9 +385,9 @@ impl BankingStage { pub fn new( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: BankingPacketReceiver, - tpu_verified_vote_receiver: BankingPacketReceiver, - verified_vote_receiver: BankingPacketReceiver, + non_vote_receiver: BankingPacketReceiver, + tpu_vote_receiver: BankingPacketReceiver, + gossip_vote_receiver: BankingPacketReceiver, transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, log_messages_bytes_limit: Option, @@ -397,9 +397,9 @@ impl BankingStage { Self::new_num_threads( cluster_info, poh_recorder, - verified_receiver, - tpu_verified_vote_receiver, - verified_vote_receiver, + non_vote_receiver, + tpu_vote_receiver, + gossip_vote_receiver, Self::num_threads(), transaction_status_sender, replay_vote_sender, @@ -413,9 +413,9 @@ impl BankingStage { pub fn new_num_threads( cluster_info: &Arc, poh_recorder: &Arc>, - verified_receiver: BankingPacketReceiver, - tpu_verified_vote_receiver: BankingPacketReceiver, - verified_vote_receiver: BankingPacketReceiver, + non_vote_receiver: BankingPacketReceiver, + tpu_vote_receiver: BankingPacketReceiver, + gossip_vote_receiver: BankingPacketReceiver, num_threads: u32, transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, @@ -443,38 +443,38 @@ impl BankingStage { // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, unprocessed_transaction_storage) = + let (packet_receiver, unprocessed_transaction_storage) = match (i, should_split_voting_threads) { (0, false) => ( - verified_vote_receiver.clone(), + gossip_vote_receiver.clone(), UnprocessedTransactionStorage::new_transaction_storage( UnprocessedPacketBatches::with_capacity(batch_limit), ThreadType::Voting(VoteSource::Gossip), ), ), (0, true) => ( - verified_vote_receiver.clone(), + gossip_vote_receiver.clone(), UnprocessedTransactionStorage::new_vote_storage( latest_unprocessed_votes.clone(), VoteSource::Gossip, ), ), (1, false) => ( - tpu_verified_vote_receiver.clone(), + tpu_vote_receiver.clone(), UnprocessedTransactionStorage::new_transaction_storage( UnprocessedPacketBatches::with_capacity(batch_limit), ThreadType::Voting(VoteSource::Tpu), ), ), (1, true) => ( - tpu_verified_vote_receiver.clone(), + tpu_vote_receiver.clone(), UnprocessedTransactionStorage::new_vote_storage( latest_unprocessed_votes.clone(), VoteSource::Tpu, ), ), _ => ( - verified_receiver.clone(), + non_vote_receiver.clone(), UnprocessedTransactionStorage::new_transaction_storage( UnprocessedPacketBatches::with_capacity(batch_limit), ThreadType::Transactions, @@ -482,7 +482,7 @@ impl BankingStage { ), }; - let mut packet_deserializer = PacketDeserializer::new(verified_receiver); + let mut packet_deserializer = PacketDeserializer::new(packet_receiver); let poh_recorder = poh_recorder.clone(); let cluster_info = cluster_info.clone(); let mut recv_start = Instant::now(); @@ -2021,9 +2021,9 @@ mod tests { let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let (verified_sender, verified_receiver) = unbounded(); - let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -2039,18 +2039,18 @@ mod tests { let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, - verified_receiver, + non_vote_receiver, tpu_vote_receiver, - gossip_verified_vote_receiver, + gossip_vote_receiver, None, replay_vote_sender, None, Arc::new(ConnectionCache::default()), bank_forks, ); - drop(verified_sender); - drop(gossip_verified_vote_sender); + drop(non_vote_sender); drop(tpu_vote_sender); + drop(gossip_vote_sender); exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); poh_service.join().unwrap(); @@ -2070,8 +2070,9 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); - let (verified_sender, verified_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -2086,15 +2087,14 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config), None); let cluster_info = new_test_cluster_info(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); - let (verified_gossip_vote_sender, verified_gossip_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, - verified_receiver, + non_vote_receiver, tpu_vote_receiver, - verified_gossip_vote_receiver, + gossip_vote_receiver, None, replay_vote_sender, None, @@ -2102,9 +2102,9 @@ mod tests { bank_forks, ); trace!("sending bank"); - drop(verified_sender); - drop(verified_gossip_vote_sender); + drop(non_vote_sender); drop(tpu_vote_sender); + drop(gossip_vote_sender); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); drop(poh_recorder); @@ -2146,9 +2146,9 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); - let (verified_sender, verified_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -2170,9 +2170,9 @@ mod tests { let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, - verified_receiver, + non_vote_receiver, tpu_vote_receiver, - gossip_verified_vote_receiver, + gossip_vote_receiver, None, replay_vote_sender, None, @@ -2210,13 +2210,13 @@ mod tests { .map(|batch| (batch, vec![0u8, 1u8, 1u8])) .collect(); let packet_batches = convert_from_old_verified(packet_batches); - verified_sender // no_ver, anf, tx + non_vote_sender // no_ver, anf, tx .send((packet_batches, None)) .unwrap(); - drop(verified_sender); + drop(non_vote_sender); drop(tpu_vote_sender); - drop(gossip_verified_vote_sender); + drop(gossip_vote_sender); // wait until banking_stage to finish up all packets banking_stage.join().unwrap(); @@ -2270,7 +2270,7 @@ mod tests { mint_keypair, .. } = create_slow_genesis_config(2); - let (verified_sender, verified_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = unbounded(); // Process a batch that includes a transaction that receives two lamports. let alice = Keypair::new(); @@ -2283,7 +2283,7 @@ mod tests { .map(|batch| (batch, vec![1u8])) .collect(); let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send((packet_batches, None)).unwrap(); + non_vote_sender.send((packet_batches, None)).unwrap(); // Process a second batch that uses the same from account, so conflicts with above TX let tx = @@ -2294,10 +2294,10 @@ mod tests { .map(|batch| (batch, vec![1u8])) .collect(); let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send((packet_batches, None)).unwrap(); + non_vote_sender.send((packet_batches, None)).unwrap(); - let (vote_sender, vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); @@ -2324,9 +2324,9 @@ mod tests { let _banking_stage = BankingStage::new_num_threads( &cluster_info, &poh_recorder, - verified_receiver, + non_vote_receiver, tpu_vote_receiver, - vote_receiver, + gossip_vote_receiver, 3, None, replay_vote_sender, @@ -2343,9 +2343,9 @@ mod tests { poh_service.join().unwrap(); entry_receiver }; - drop(verified_sender); - drop(vote_sender); + drop(non_vote_sender); drop(tpu_vote_sender); + drop(gossip_vote_sender); // consume the entire entry_receiver, feed it into a new bank // check that the balance is what we expect. @@ -4075,9 +4075,9 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); - let (verified_sender, verified_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -4099,9 +4099,9 @@ mod tests { let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, - verified_receiver, + non_vote_receiver, tpu_vote_receiver, - gossip_verified_vote_receiver, + gossip_vote_receiver, None, replay_vote_sender, None, @@ -4167,15 +4167,15 @@ mod tests { }) .collect_vec(); + let non_vote_packet_batches = to_packet_batches(&txs, 10); let tpu_packet_batches = to_packet_batches(&tpu_votes, 10); let gossip_packet_batches = to_packet_batches(&gossip_votes, 10); - let tx_packet_batches = to_packet_batches(&txs, 10); // Send em all [ - (tpu_packet_batches, tpu_vote_sender.clone()), - (gossip_packet_batches, gossip_verified_vote_sender.clone()), - (tx_packet_batches, verified_sender.clone()), + (non_vote_packet_batches, non_vote_sender), + (tpu_packet_batches, tpu_vote_sender), + (gossip_packet_batches, gossip_vote_sender), ] .into_iter() .map(|(packet_batches, sender)| { @@ -4185,9 +4185,6 @@ mod tests { }) .for_each(|handle| handle.join().unwrap()); - drop(verified_sender); - drop(tpu_vote_sender); - drop(gossip_verified_vote_sender); banking_stage.join().unwrap(); exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 2f692bc226..fd8030703f 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -154,7 +154,7 @@ impl Tpu { "Vote", ); - let (verified_sender, verified_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = unbounded(); let stats = Arc::new(StreamStats::default()); let (_, tpu_quic_t) = spawn_server( @@ -188,15 +188,14 @@ impl Tpu { .unwrap(); let sigverify_stage = { - let verifier = TransactionSigVerifier::new(verified_sender); + let verifier = TransactionSigVerifier::new(non_vote_sender); SigVerifyStage::new(find_packet_sender_stake_receiver, verifier, "tpu-verifier") }; - let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let vote_sigverify_stage = { - let verifier = - TransactionSigVerifier::new_reject_non_vote(verified_tpu_vote_packets_sender); + let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender); SigVerifyStage::new( vote_find_packet_sender_stake_receiver, verifier, @@ -204,12 +203,11 @@ impl Tpu { ) }; - let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = - unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( exit.clone(), cluster_info.clone(), - verified_gossip_vote_packets_sender, + gossip_vote_sender, poh_recorder.clone(), vote_tracker, bank_forks.clone(), @@ -225,9 +223,9 @@ impl Tpu { let banking_stage = BankingStage::new( cluster_info, poh_recorder, - verified_receiver, - verified_tpu_vote_packets_receiver, - verified_gossip_vote_packets_receiver, + non_vote_receiver, + tpu_vote_receiver, + gossip_vote_receiver, transaction_status_sender, replay_vote_sender, log_messages_bytes_limit,