Don't filter transactions if we are buffering it locally (#4395)

automerge
This commit is contained in:
Pankaj Garg 2019-05-22 17:54:28 -07:00 committed by Grimes
parent 2f976ae460
commit b8f6c17dee
2 changed files with 34 additions and 6 deletions

View File

@ -55,6 +55,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let (genesis_block, _mint_keypair) = create_genesis_block(100_000); let (genesis_block, _mint_keypair) = create_genesis_block(100_000);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let my_id = Pubkey::new_rand();
{ {
let blocktree = Arc::new( let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
@ -75,7 +76,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
// If the packet buffers are copied, performance will be poor. // If the packet buffers are copied, performance will be poor.
bencher.iter(move || { bencher.iter(move || {
let _ignored = let _ignored =
BankingStage::consume_buffered_packets(&poh_recorder, packets.as_slice()); BankingStage::consume_buffered_packets(&my_id, &poh_recorder, packets.as_slice());
}); });
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);

View File

@ -144,6 +144,7 @@ impl BankingStage {
} }
pub fn consume_buffered_packets( pub fn consume_buffered_packets(
my_id: &Pubkey,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &[PacketsAndOffsets], buffered_packets: &[PacketsAndOffsets],
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
@ -186,10 +187,16 @@ impl BankingStage {
); );
if processed < verified_txs_len { if processed < verified_txs_len {
let next_leader = poh_recorder.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let unprocessed_indexes = let unprocessed_indexes = Self::filter_unprocessed_packets(
Self::filter_unprocessed_packets(&bank, &msgs, &unprocessed_indexes); &bank,
&msgs,
&unprocessed_indexes,
my_id,
next_leader,
);
Self::push_unprocessed( Self::push_unprocessed(
&mut unprocessed_packets, &mut unprocessed_packets,
msgs.to_owned(), msgs.to_owned(),
@ -271,7 +278,7 @@ impl BankingStage {
match decision { match decision {
BufferedPacketsDecision::Consume => { BufferedPacketsDecision::Consume => {
Self::consume_buffered_packets(poh_recorder, buffered_packets) Self::consume_buffered_packets(&rcluster_info.id(), poh_recorder, buffered_packets)
} }
BufferedPacketsDecision::Forward => { BufferedPacketsDecision::Forward => {
if enable_forwarding { if enable_forwarding {
@ -334,6 +341,7 @@ impl BankingStage {
&poh_recorder, &poh_recorder,
recv_start, recv_start,
recv_timeout, recv_timeout,
cluster_info,
id, id,
) { ) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
@ -654,7 +662,18 @@ impl BankingStage {
bank: &Arc<Bank>, bank: &Arc<Bank>,
msgs: &Packets, msgs: &Packets,
transaction_indexes: &[usize], transaction_indexes: &[usize],
my_id: &Pubkey,
next_leader: Option<Pubkey>,
) -> Vec<usize> { ) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
// as we'll filter it again while processing the packets.
// Filtering helps if we were going to forward the packets to some other node
if let Some(leader) = next_leader {
if leader == *my_id {
return transaction_indexes.to_vec();
}
}
let (transactions, transaction_indexes) = let (transactions, transaction_indexes) =
Self::transactions_from_packets(msgs, &transaction_indexes); Self::transactions_from_packets(msgs, &transaction_indexes);
@ -689,6 +708,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
recv_start: &mut Instant, recv_start: &mut Instant,
recv_timeout: Duration, recv_timeout: Duration,
cluster_info: &Arc<RwLock<ClusterInfo>>,
id: u32, id: u32,
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let mms = verified_receiver let mms = verified_receiver
@ -729,11 +749,18 @@ impl BankingStage {
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
if processed < verified_txs_len { if processed < verified_txs_len {
let next_leader = poh.lock().unwrap().next_slot_leader();
let my_id = cluster_info.read().unwrap().id();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
while let Some((msgs, vers)) = mms_iter.next() { while let Some((msgs, vers)) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(vers); let packet_indexes = Self::generate_packet_indexes(vers);
let unprocessed_indexes = let unprocessed_indexes = Self::filter_unprocessed_packets(
Self::filter_unprocessed_packets(&bank, &msgs, &packet_indexes); &bank,
&msgs,
&packet_indexes,
&my_id,
next_leader,
);
Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes);
} }
} }