From f2187780d21630bd922c0ae214c921d8f7d9f522 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 18 Apr 2019 11:18:49 -0700 Subject: [PATCH] Do not forward vote transactions (#3871) --- core/src/banking_stage.rs | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 389c16c4e0..ffdaa0cae2 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -84,10 +84,11 @@ impl BankingStage { // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let verified_receiver = if i < num_threads - 1 { - verified_receiver.clone() + let (verified_receiver, enable_forwarding) = if i < num_threads - 1 { + (verified_receiver.clone(), true) } else { - verified_vote_receiver.clone() + // Disable forwarding of vote transactions, as votes are gossiped + (verified_vote_receiver.clone(), false) }; let poh_recorder = poh_recorder.clone(); @@ -102,6 +103,7 @@ impl BankingStage { &poh_recorder, &cluster_info, &mut recv_start, + enable_forwarding, ); exit.store(true, Ordering::Relaxed); }) @@ -205,6 +207,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &Arc>, buffered_packets: &[(Packets, usize, Vec)], + enable_forwarding: bool, ) -> Result { let rcluster_info = cluster_info.read().unwrap(); @@ -219,11 +222,13 @@ impl BankingStage { Self::process_buffered_packets(poh_recorder, buffered_packets) } BufferedPacketsDecision::Forward => { - let _ = Self::forward_unprocessed_packets( - &socket, - &rcluster_info.leader_data().unwrap().tpu_via_blobs, - &buffered_packets, - ); + if enable_forwarding { + let _ = Self::forward_unprocessed_packets( + &socket, + &rcluster_info.leader_data().unwrap().tpu_via_blobs, + &buffered_packets, + ); + } Ok(vec![]) } _ => Ok(buffered_packets.to_vec()), @@ -256,6 +261,7 @@ impl BankingStage { poh_recorder: &Arc>, cluster_info: &Arc>, recv_start: &mut Instant, + enable_forwarding: bool, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; @@ -266,6 +272,7 @@ impl BankingStage { poh_recorder, cluster_info, &buffered_packets, + enable_forwarding, ) .map(|packets| buffered_packets = packets) .unwrap_or_else(|_| buffered_packets.clear()); @@ -295,12 +302,14 @@ impl BankingStage { continue; } - if let Some(leader) = cluster_info.read().unwrap().leader_data() { - let _ = Self::forward_unprocessed_packets( - &socket, - &leader.tpu_via_blobs, - &unprocessed_packets, - ); + if enable_forwarding { + if let Some(leader) = cluster_info.read().unwrap().leader_data() { + let _ = Self::forward_unprocessed_packets( + &socket, + &leader.tpu_via_blobs, + &unprocessed_packets, + ); + } } } Err(err) => {