From ee35ed5250ecdbe4c44282ff34549d8b3f16a2fe Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Sat, 13 Apr 2019 23:19:54 -0700 Subject: [PATCH] Refactored buffered packet forwarding code (#3750) - Added unit tests - Don't consume packets if bank is not known --- core/src/banking_stage.rs | 114 +++++++++++++++++++++++++++++--------- 1 file changed, 88 insertions(+), 26 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1b4add1ff..1bc5707ae 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,6 +3,7 @@ //! can do its processing in parallel with signature verification on the GPU. use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; +use crate::contact_info::ContactInfo; use crate::entry; use crate::entry::{hash_transactions, Entry}; use crate::leader_schedule_utils; @@ -40,6 +41,13 @@ pub struct BankingStage { bank_thread_hdls: Vec>, } +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum BufferedPacketsDecision { + Consume, + Forward, + Hold, +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] @@ -151,6 +159,30 @@ impl BankingStage { Ok(unprocessed_packets) } + fn process_or_forward_packets( + leader_data: Option<&ContactInfo>, + bank_is_available: bool, + my_id: &Pubkey, + ) -> BufferedPacketsDecision { + leader_data.map_or( + // If leader is not known, return the buffered packets as is + BufferedPacketsDecision::Hold, + // else process the packets + |x| { + if bank_is_available { + // If the bank is available, this node is the leader + BufferedPacketsDecision::Consume + } else if x.id != *my_id { + // If the current node is not the leader, forward the buffered packets + BufferedPacketsDecision::Forward + } else { + // We don't know the leader. Hold the packets for now + BufferedPacketsDecision::Hold + } + }, + ) + } + fn handle_buffered_packets( socket: &std::net::UdpSocket, poh_recorder: &Arc>, @@ -159,34 +191,26 @@ impl BankingStage { ) -> Result { let rcluster_info = cluster_info.read().unwrap(); - // If there's a bank, and leader is available, this node "is" the leader - // process the buffered packets - if poh_recorder.lock().unwrap().bank().is_some() { - if rcluster_info.leader_data().is_some() { - return Self::process_buffered_packets(poh_recorder, buffered_packets); + let decision = Self::process_or_forward_packets( + rcluster_info.leader_data(), + poh_recorder.lock().unwrap().bank().is_some(), + &rcluster_info.id(), + ); + + match decision { + BufferedPacketsDecision::Consume => { + Self::process_buffered_packets(poh_recorder, buffered_packets) } - - return Ok(buffered_packets.to_vec()); + BufferedPacketsDecision::Forward => { + let _ = Self::forward_unprocessed_packets( + &socket, + &rcluster_info.leader_data().unwrap().tpu_via_blobs, + &buffered_packets, + ); + Ok(vec![]) + } + _ => Ok(buffered_packets.to_vec()), } - - // If leader is not known, return the buffered packets as is - // else process the packets - rcluster_info - .leader_data() - .map_or(Ok(buffered_packets.to_vec()), |x| { - if x.id == rcluster_info.id() { - // If the current node is the leader, process the buffered packets - Self::process_buffered_packets(poh_recorder, buffered_packets) - } else { - // If the current node is not the leader, forward the buffered packets - let _ = Self::forward_unprocessed_packets( - &socket, - &rcluster_info.leader_data().unwrap().tpu_via_blobs, - &buffered_packets, - ); - Ok(vec![]) - } - }) } fn should_buffer_packets( @@ -880,6 +904,44 @@ mod tests { Blocktree::destroy(&ledger_path).unwrap(); } + #[test] + fn test_should_process_or_forward_packets() { + let my_id = Pubkey::new_rand(); + let my_id1 = Pubkey::new_rand(); + + assert_eq!( + BankingStage::process_or_forward_packets(None, true, &my_id), + BufferedPacketsDecision::Hold + ); + assert_eq!( + BankingStage::process_or_forward_packets(None, false, &my_id), + BufferedPacketsDecision::Hold + ); + assert_eq!( + BankingStage::process_or_forward_packets(None, false, &my_id1), + BufferedPacketsDecision::Hold + ); + + let mut contact_info = ContactInfo::default(); + contact_info.id = my_id1; + assert_eq!( + BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id), + BufferedPacketsDecision::Forward + ); + assert_eq!( + BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id), + BufferedPacketsDecision::Consume + ); + assert_eq!( + BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id1), + BufferedPacketsDecision::Hold + ); + assert_eq!( + BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id1), + BufferedPacketsDecision::Consume + ); + } + #[test] fn test_bank_process_and_record_transactions() { solana_logger::setup();