Rename banking stage packet receivers consistently (#29752)

Rename banking stage batch receivers consistently
This commit is contained in:
Ryo Onodera 2023-01-19 10:04:55 +09:00 committed by GitHub
parent 2c728bb172
commit 4973fe18f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 79 deletions

View File

@ -321,9 +321,6 @@ fn main() {
.. ..
} = create_genesis_config(mint_total); } = create_genesis_config(mint_total);
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new_for_benches(&genesis_config); let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
@ -410,6 +407,9 @@ fn main() {
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let (exit, poh_recorder, poh_service, signal_receiver) = let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blockstore, None, Some(leader_schedule_cache)); create_test_recorder(&bank, &blockstore, None, Some(leader_schedule_cache));
let (non_vote_sender, non_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
let cluster_info = ClusterInfo::new( let cluster_info = ClusterInfo::new(
Node::new_localhost().info, Node::new_localhost().info,
Arc::new(Keypair::new()), Arc::new(Keypair::new()),
@ -424,9 +424,9 @@ fn main() {
let banking_stage = BankingStage::new_num_threads( let banking_stage = BankingStage::new_num_threads(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, non_vote_receiver,
tpu_vote_receiver, tpu_vote_receiver,
vote_receiver, gossip_vote_receiver,
num_banking_threads, num_banking_threads,
None, None,
replay_vote_sender, replay_vote_sender,
@ -461,7 +461,7 @@ fn main() {
packet_batch_index, packet_batch_index,
timestamp(), timestamp(),
); );
verified_sender non_vote_sender
.send((vec![packet_batch.clone()], None)) .send((vec![packet_batch.clone()], None))
.unwrap(); .unwrap();
} }
@ -574,9 +574,9 @@ fn main() {
(1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64), (1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64),
); );
drop(verified_sender); drop(non_vote_sender);
drop(tpu_vote_sender); drop(tpu_vote_sender);
drop(vote_sender); drop(gossip_vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
debug!("waited for banking_stage"); debug!("waited for banking_stage");

View File

@ -197,9 +197,10 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
// during the benchmark // during the benchmark
genesis_config.ticks_per_slot = 10_000; genesis_config.ticks_per_slot = 10_000;
let (verified_sender, verified_receiver) = unbounded(); let (non_vote_sender, non_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded(); let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
let mut bank = Bank::new_for_benches(&genesis_config); let mut bank = Bank::new_for_benches(&genesis_config);
// Allow arbitrary transaction processing time for the purposes of this bench // Allow arbitrary transaction processing time for the purposes of this bench
bank.ns_per_slot = u128::MAX; bank.ns_per_slot = u128::MAX;
@ -279,9 +280,9 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let _banking_stage = BankingStage::new( let _banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, non_vote_receiver,
tpu_vote_receiver, tpu_vote_receiver,
vote_receiver, gossip_vote_receiver,
None, None,
s, s,
None, None,
@ -305,7 +306,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
tpu_vote_sender tpu_vote_sender
.send((vote_packets[start..start + chunk_len].to_vec(), None)) .send((vote_packets[start..start + chunk_len].to_vec(), None))
.unwrap(); .unwrap();
vote_sender gossip_vote_sender
.send((vote_packets[start..start + chunk_len].to_vec(), None)) .send((vote_packets[start..start + chunk_len].to_vec(), None))
.unwrap(); .unwrap();
} }
@ -320,7 +321,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
for xv in v { for xv in v {
sent += xv.len(); sent += xv.len();
} }
verified_sender.send((v.to_vec(), None)).unwrap(); non_vote_sender.send((v.to_vec(), None)).unwrap();
} }
check_txs(&signal_receiver2, txes / CHUNKS); check_txs(&signal_receiver2, txes / CHUNKS);

View File

@ -385,9 +385,9 @@ impl BankingStage {
pub fn new( pub fn new(
cluster_info: &Arc<ClusterInfo>, cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<RwLock<PohRecorder>>, poh_recorder: &Arc<RwLock<PohRecorder>>,
verified_receiver: BankingPacketReceiver, non_vote_receiver: BankingPacketReceiver,
tpu_verified_vote_receiver: BankingPacketReceiver, tpu_vote_receiver: BankingPacketReceiver,
verified_vote_receiver: BankingPacketReceiver, gossip_vote_receiver: BankingPacketReceiver,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
@ -397,9 +397,9 @@ impl BankingStage {
Self::new_num_threads( Self::new_num_threads(
cluster_info, cluster_info,
poh_recorder, poh_recorder,
verified_receiver, non_vote_receiver,
tpu_verified_vote_receiver, tpu_vote_receiver,
verified_vote_receiver, gossip_vote_receiver,
Self::num_threads(), Self::num_threads(),
transaction_status_sender, transaction_status_sender,
replay_vote_sender, replay_vote_sender,
@ -413,9 +413,9 @@ impl BankingStage {
pub fn new_num_threads( pub fn new_num_threads(
cluster_info: &Arc<ClusterInfo>, cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<RwLock<PohRecorder>>, poh_recorder: &Arc<RwLock<PohRecorder>>,
verified_receiver: BankingPacketReceiver, non_vote_receiver: BankingPacketReceiver,
tpu_verified_vote_receiver: BankingPacketReceiver, tpu_vote_receiver: BankingPacketReceiver,
verified_vote_receiver: BankingPacketReceiver, gossip_vote_receiver: BankingPacketReceiver,
num_threads: u32, num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
@ -443,38 +443,38 @@ impl BankingStage {
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads) let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| { .map(|i| {
let (verified_receiver, unprocessed_transaction_storage) = let (packet_receiver, unprocessed_transaction_storage) =
match (i, should_split_voting_threads) { match (i, should_split_voting_threads) {
(0, false) => ( (0, false) => (
verified_vote_receiver.clone(), gossip_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage( UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit), UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Voting(VoteSource::Gossip), ThreadType::Voting(VoteSource::Gossip),
), ),
), ),
(0, true) => ( (0, true) => (
verified_vote_receiver.clone(), gossip_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage( UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(), latest_unprocessed_votes.clone(),
VoteSource::Gossip, VoteSource::Gossip,
), ),
), ),
(1, false) => ( (1, false) => (
tpu_verified_vote_receiver.clone(), tpu_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage( UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit), UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Voting(VoteSource::Tpu), ThreadType::Voting(VoteSource::Tpu),
), ),
), ),
(1, true) => ( (1, true) => (
tpu_verified_vote_receiver.clone(), tpu_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage( UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(), latest_unprocessed_votes.clone(),
VoteSource::Tpu, VoteSource::Tpu,
), ),
), ),
_ => ( _ => (
verified_receiver.clone(), non_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage( UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit), UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Transactions, ThreadType::Transactions,
@ -482,7 +482,7 @@ impl BankingStage {
), ),
}; };
let mut packet_deserializer = PacketDeserializer::new(verified_receiver); let mut packet_deserializer = PacketDeserializer::new(packet_receiver);
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let mut recv_start = Instant::now(); let mut recv_start = Instant::now();
@ -2021,9 +2021,9 @@ mod tests {
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let (verified_sender, verified_receiver) = unbounded(); let (non_vote_sender, non_vote_receiver) = unbounded();
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
{ {
let blockstore = Arc::new( let blockstore = Arc::new(
@ -2039,18 +2039,18 @@ mod tests {
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, non_vote_receiver,
tpu_vote_receiver, tpu_vote_receiver,
gossip_verified_vote_receiver, gossip_vote_receiver,
None, None,
replay_vote_sender, replay_vote_sender,
None, None,
Arc::new(ConnectionCache::default()), Arc::new(ConnectionCache::default()),
bank_forks, bank_forks,
); );
drop(verified_sender); drop(non_vote_sender);
drop(gossip_verified_vote_sender);
drop(tpu_vote_sender); drop(tpu_vote_sender);
drop(gossip_vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
poh_service.join().unwrap(); poh_service.join().unwrap();
@ -2070,8 +2070,9 @@ mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded(); let (non_vote_sender, non_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
{ {
let blockstore = Arc::new( let blockstore = Arc::new(
@ -2086,15 +2087,14 @@ mod tests {
create_test_recorder(&bank, &blockstore, Some(poh_config), None); create_test_recorder(&bank, &blockstore, Some(poh_config), None);
let cluster_info = new_test_cluster_info(Node::new_localhost().info); let cluster_info = new_test_cluster_info(Node::new_localhost().info);
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let (verified_gossip_vote_sender, verified_gossip_vote_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, non_vote_receiver,
tpu_vote_receiver, tpu_vote_receiver,
verified_gossip_vote_receiver, gossip_vote_receiver,
None, None,
replay_vote_sender, replay_vote_sender,
None, None,
@ -2102,9 +2102,9 @@ mod tests {
bank_forks, bank_forks,
); );
trace!("sending bank"); trace!("sending bank");
drop(verified_sender); drop(non_vote_sender);
drop(verified_gossip_vote_sender);
drop(tpu_vote_sender); drop(tpu_vote_sender);
drop(gossip_vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();
drop(poh_recorder); drop(poh_recorder);
@ -2146,9 +2146,9 @@ mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded(); let (non_vote_sender, non_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
{ {
let blockstore = Arc::new( let blockstore = Arc::new(
@ -2170,9 +2170,9 @@ mod tests {
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, non_vote_receiver,
tpu_vote_receiver, tpu_vote_receiver,
gossip_verified_vote_receiver, gossip_vote_receiver,
None, None,
replay_vote_sender, replay_vote_sender,
None, None,
@ -2210,13 +2210,13 @@ mod tests {
.map(|batch| (batch, vec![0u8, 1u8, 1u8])) .map(|batch| (batch, vec![0u8, 1u8, 1u8]))
.collect(); .collect();
let packet_batches = convert_from_old_verified(packet_batches); let packet_batches = convert_from_old_verified(packet_batches);
verified_sender // no_ver, anf, tx non_vote_sender // no_ver, anf, tx
.send((packet_batches, None)) .send((packet_batches, None))
.unwrap(); .unwrap();
drop(verified_sender); drop(non_vote_sender);
drop(tpu_vote_sender); drop(tpu_vote_sender);
drop(gossip_verified_vote_sender); drop(gossip_vote_sender);
// wait until banking_stage to finish up all packets // wait until banking_stage to finish up all packets
banking_stage.join().unwrap(); banking_stage.join().unwrap();
@ -2270,7 +2270,7 @@ mod tests {
mint_keypair, mint_keypair,
.. ..
} = create_slow_genesis_config(2); } = create_slow_genesis_config(2);
let (verified_sender, verified_receiver) = unbounded(); let (non_vote_sender, non_vote_receiver) = unbounded();
// Process a batch that includes a transaction that receives two lamports. // Process a batch that includes a transaction that receives two lamports.
let alice = Keypair::new(); let alice = Keypair::new();
@ -2283,7 +2283,7 @@ mod tests {
.map(|batch| (batch, vec![1u8])) .map(|batch| (batch, vec![1u8]))
.collect(); .collect();
let packet_batches = convert_from_old_verified(packet_batches); let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send((packet_batches, None)).unwrap(); non_vote_sender.send((packet_batches, None)).unwrap();
// Process a second batch that uses the same from account, so conflicts with above TX // Process a second batch that uses the same from account, so conflicts with above TX
let tx = let tx =
@ -2294,10 +2294,10 @@ mod tests {
.map(|batch| (batch, vec![1u8])) .map(|batch| (batch, vec![1u8]))
.collect(); .collect();
let packet_batches = convert_from_old_verified(packet_batches); let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send((packet_batches, None)).unwrap(); non_vote_sender.send((packet_batches, None)).unwrap();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
{ {
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
@ -2324,9 +2324,9 @@ mod tests {
let _banking_stage = BankingStage::new_num_threads( let _banking_stage = BankingStage::new_num_threads(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, non_vote_receiver,
tpu_vote_receiver, tpu_vote_receiver,
vote_receiver, gossip_vote_receiver,
3, 3,
None, None,
replay_vote_sender, replay_vote_sender,
@ -2343,9 +2343,9 @@ mod tests {
poh_service.join().unwrap(); poh_service.join().unwrap();
entry_receiver entry_receiver
}; };
drop(verified_sender); drop(non_vote_sender);
drop(vote_sender);
drop(tpu_vote_sender); drop(tpu_vote_sender);
drop(gossip_vote_sender);
// consume the entire entry_receiver, feed it into a new bank // consume the entire entry_receiver, feed it into a new bank
// check that the balance is what we expect. // check that the balance is what we expect.
@ -4075,9 +4075,9 @@ mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded(); let (non_vote_sender, non_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
{ {
let blockstore = Arc::new( let blockstore = Arc::new(
@ -4099,9 +4099,9 @@ mod tests {
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, non_vote_receiver,
tpu_vote_receiver, tpu_vote_receiver,
gossip_verified_vote_receiver, gossip_vote_receiver,
None, None,
replay_vote_sender, replay_vote_sender,
None, None,
@ -4167,15 +4167,15 @@ mod tests {
}) })
.collect_vec(); .collect_vec();
let non_vote_packet_batches = to_packet_batches(&txs, 10);
let tpu_packet_batches = to_packet_batches(&tpu_votes, 10); let tpu_packet_batches = to_packet_batches(&tpu_votes, 10);
let gossip_packet_batches = to_packet_batches(&gossip_votes, 10); let gossip_packet_batches = to_packet_batches(&gossip_votes, 10);
let tx_packet_batches = to_packet_batches(&txs, 10);
// Send em all // Send em all
[ [
(tpu_packet_batches, tpu_vote_sender.clone()), (non_vote_packet_batches, non_vote_sender),
(gossip_packet_batches, gossip_verified_vote_sender.clone()), (tpu_packet_batches, tpu_vote_sender),
(tx_packet_batches, verified_sender.clone()), (gossip_packet_batches, gossip_vote_sender),
] ]
.into_iter() .into_iter()
.map(|(packet_batches, sender)| { .map(|(packet_batches, sender)| {
@ -4185,9 +4185,6 @@ mod tests {
}) })
.for_each(|handle| handle.join().unwrap()); .for_each(|handle| handle.join().unwrap());
drop(verified_sender);
drop(tpu_vote_sender);
drop(gossip_verified_vote_sender);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();

View File

@ -154,7 +154,7 @@ impl Tpu {
"Vote", "Vote",
); );
let (verified_sender, verified_receiver) = unbounded(); let (non_vote_sender, non_vote_receiver) = unbounded();
let stats = Arc::new(StreamStats::default()); let stats = Arc::new(StreamStats::default());
let (_, tpu_quic_t) = spawn_server( let (_, tpu_quic_t) = spawn_server(
@ -188,15 +188,14 @@ impl Tpu {
.unwrap(); .unwrap();
let sigverify_stage = { let sigverify_stage = {
let verifier = TransactionSigVerifier::new(verified_sender); let verifier = TransactionSigVerifier::new(non_vote_sender);
SigVerifyStage::new(find_packet_sender_stake_receiver, verifier, "tpu-verifier") SigVerifyStage::new(find_packet_sender_stake_receiver, verifier, "tpu-verifier")
}; };
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let vote_sigverify_stage = { let vote_sigverify_stage = {
let verifier = let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender);
TransactionSigVerifier::new_reject_non_vote(verified_tpu_vote_packets_sender);
SigVerifyStage::new( SigVerifyStage::new(
vote_find_packet_sender_stake_receiver, vote_find_packet_sender_stake_receiver,
verifier, verifier,
@ -204,12 +203,11 @@ impl Tpu {
) )
}; };
let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = let (gossip_vote_sender, gossip_vote_receiver) = unbounded();
unbounded();
let cluster_info_vote_listener = ClusterInfoVoteListener::new( let cluster_info_vote_listener = ClusterInfoVoteListener::new(
exit.clone(), exit.clone(),
cluster_info.clone(), cluster_info.clone(),
verified_gossip_vote_packets_sender, gossip_vote_sender,
poh_recorder.clone(), poh_recorder.clone(),
vote_tracker, vote_tracker,
bank_forks.clone(), bank_forks.clone(),
@ -225,9 +223,9 @@ impl Tpu {
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
cluster_info, cluster_info,
poh_recorder, poh_recorder,
verified_receiver, non_vote_receiver,
verified_tpu_vote_packets_receiver, tpu_vote_receiver,
verified_gossip_vote_packets_receiver, gossip_vote_receiver,
transaction_status_sender, transaction_status_sender,
replay_vote_sender, replay_vote_sender,
log_messages_bytes_limit, log_messages_bytes_limit,