Refactored buffered packet forwarding code (#3750)

- Added unit tests
- Don't consume packets if bank is not known
This commit is contained in:
Pankaj Garg 2019-04-13 23:19:54 -07:00 committed by GitHub
parent 92b5e131fe
commit ee35ed5250
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 88 additions and 26 deletions

View File

@ -3,6 +3,7 @@
//! can do its processing in parallel with signature verification on the GPU.
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::contact_info::ContactInfo;
use crate::entry;
use crate::entry::{hash_transactions, Entry};
use crate::leader_schedule_utils;
@ -40,6 +41,13 @@ pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BufferedPacketsDecision {
Consume,
Forward,
Hold,
}
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)]
@ -151,6 +159,30 @@ impl BankingStage {
Ok(unprocessed_packets)
}
fn process_or_forward_packets(
leader_data: Option<&ContactInfo>,
bank_is_available: bool,
my_id: &Pubkey,
) -> BufferedPacketsDecision {
leader_data.map_or(
// If leader is not known, return the buffered packets as is
BufferedPacketsDecision::Hold,
// else process the packets
|x| {
if bank_is_available {
// If the bank is available, this node is the leader
BufferedPacketsDecision::Consume
} else if x.id != *my_id {
// If the current node is not the leader, forward the buffered packets
BufferedPacketsDecision::Forward
} else {
// We don't know the leader. Hold the packets for now
BufferedPacketsDecision::Hold
}
},
)
}
fn handle_buffered_packets(
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -159,34 +191,26 @@ impl BankingStage {
) -> Result<UnprocessedPackets> {
let rcluster_info = cluster_info.read().unwrap();
// If there's a bank, and leader is available, this node "is" the leader
// process the buffered packets
if poh_recorder.lock().unwrap().bank().is_some() {
if rcluster_info.leader_data().is_some() {
return Self::process_buffered_packets(poh_recorder, buffered_packets);
let decision = Self::process_or_forward_packets(
rcluster_info.leader_data(),
poh_recorder.lock().unwrap().bank().is_some(),
&rcluster_info.id(),
);
match decision {
BufferedPacketsDecision::Consume => {
Self::process_buffered_packets(poh_recorder, buffered_packets)
}
return Ok(buffered_packets.to_vec());
BufferedPacketsDecision::Forward => {
let _ = Self::forward_unprocessed_packets(
&socket,
&rcluster_info.leader_data().unwrap().tpu_via_blobs,
&buffered_packets,
);
Ok(vec![])
}
_ => Ok(buffered_packets.to_vec()),
}
// If leader is not known, return the buffered packets as is
// else process the packets
rcluster_info
.leader_data()
.map_or(Ok(buffered_packets.to_vec()), |x| {
if x.id == rcluster_info.id() {
// If the current node is the leader, process the buffered packets
Self::process_buffered_packets(poh_recorder, buffered_packets)
} else {
// If the current node is not the leader, forward the buffered packets
let _ = Self::forward_unprocessed_packets(
&socket,
&rcluster_info.leader_data().unwrap().tpu_via_blobs,
&buffered_packets,
);
Ok(vec![])
}
})
}
fn should_buffer_packets(
@ -880,6 +904,44 @@ mod tests {
Blocktree::destroy(&ledger_path).unwrap();
}
#[test]
fn test_should_process_or_forward_packets() {
let my_id = Pubkey::new_rand();
let my_id1 = Pubkey::new_rand();
assert_eq!(
BankingStage::process_or_forward_packets(None, true, &my_id),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::process_or_forward_packets(None, false, &my_id),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::process_or_forward_packets(None, false, &my_id1),
BufferedPacketsDecision::Hold
);
let mut contact_info = ContactInfo::default();
contact_info.id = my_id1;
assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id),
BufferedPacketsDecision::Forward
);
assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id),
BufferedPacketsDecision::Consume
);
assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id1),
BufferedPacketsDecision::Hold
);
assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id1),
BufferedPacketsDecision::Consume
);
}
#[test]
fn test_bank_process_and_record_transactions() {
solana_logger::setup();