From b8f6c17dee05c9dda0c8095b3d026bc1444cbcfd Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 22 May 2019 17:54:28 -0700 Subject: [PATCH] Don't filter transactions if we are buffering it locally (#4395) automerge --- core/benches/banking_stage.rs | 3 ++- core/src/banking_stage.rs | 37 ++++++++++++++++++++++++++++++----- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 5b051469e..debd76416 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -55,6 +55,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let (genesis_block, _mint_keypair) = create_genesis_block(100_000); let bank = Arc::new(Bank::new(&genesis_block)); let ledger_path = get_tmp_ledger_path!(); + let my_id = Pubkey::new_rand(); { let blocktree = Arc::new( Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), @@ -75,7 +76,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { // If the packet buffers are copied, performance will be poor. bencher.iter(move || { let _ignored = - BankingStage::consume_buffered_packets(&poh_recorder, packets.as_slice()); + BankingStage::consume_buffered_packets(&my_id, &poh_recorder, packets.as_slice()); }); exit.store(true, Ordering::Relaxed); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 881317db3..d688c6224 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -144,6 +144,7 @@ impl BankingStage { } pub fn consume_buffered_packets( + my_id: &Pubkey, poh_recorder: &Arc>, buffered_packets: &[PacketsAndOffsets], ) -> Result { @@ -186,10 +187,16 @@ impl BankingStage { ); if processed < verified_txs_len { + let next_leader = poh_recorder.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { - let unprocessed_indexes = - Self::filter_unprocessed_packets(&bank, &msgs, &unprocessed_indexes); + let unprocessed_indexes = Self::filter_unprocessed_packets( + &bank, + &msgs, + &unprocessed_indexes, + my_id, + next_leader, + ); Self::push_unprocessed( &mut unprocessed_packets, msgs.to_owned(), @@ -271,7 +278,7 @@ impl BankingStage { match decision { BufferedPacketsDecision::Consume => { - Self::consume_buffered_packets(poh_recorder, buffered_packets) + Self::consume_buffered_packets(&rcluster_info.id(), poh_recorder, buffered_packets) } BufferedPacketsDecision::Forward => { if enable_forwarding { @@ -334,6 +341,7 @@ impl BankingStage { &poh_recorder, recv_start, recv_timeout, + cluster_info, id, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), @@ -654,7 +662,18 @@ impl BankingStage { bank: &Arc, msgs: &Packets, transaction_indexes: &[usize], + my_id: &Pubkey, + next_leader: Option, ) -> Vec { + // Check if we are the next leader. If so, let's not filter the packets + // as we'll filter it again while processing the packets. + // Filtering helps if we were going to forward the packets to some other node + if let Some(leader) = next_leader { + if leader == *my_id { + return transaction_indexes.to_vec(); + } + } + let (transactions, transaction_indexes) = Self::transactions_from_packets(msgs, &transaction_indexes); @@ -689,6 +708,7 @@ impl BankingStage { poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, + cluster_info: &Arc>, id: u32, ) -> Result { let mms = verified_receiver @@ -729,11 +749,18 @@ impl BankingStage { Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); if processed < verified_txs_len { + let next_leader = poh.lock().unwrap().next_slot_leader(); + let my_id = cluster_info.read().unwrap().id(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones while let Some((msgs, vers)) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(vers); - let unprocessed_indexes = - Self::filter_unprocessed_packets(&bank, &msgs, &packet_indexes); + let unprocessed_indexes = Self::filter_unprocessed_packets( + &bank, + &msgs, + &packet_indexes, + &my_id, + next_leader, + ); Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); } }