Buffer unprocessed packets if next leader is the current node

This commit is contained in:
Pankaj Garg 2019-03-08 16:48:15 -08:00
parent 0c592c52f6
commit e3cacb9296
1 changed files with 82 additions and 3 deletions

View File

@ -5,6 +5,7 @@
use crate::cluster_info::ClusterInfo;
use crate::entry::Entry;
use crate::leader_confirmation_service::LeaderConfirmationService;
use crate::leader_schedule_utils;
use crate::packet::Packets;
use crate::packet::SharedPackets;
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
@ -76,32 +77,110 @@ impl BankingStage {
fn forward_unprocessed_packets(
socket: &std::net::UdpSocket,
tpu: &std::net::SocketAddr,
unprocessed_packets: UnprocessedPackets,
unprocessed_packets: &UnprocessedPackets,
) -> std::io::Result<()> {
for (packets, start_index) in unprocessed_packets {
let packets = packets.read().unwrap();
for packet in packets.packets.iter().skip(start_index) {
for packet in packets.packets.iter().skip(*start_index) {
socket.send_to(&packet.data[..packet.meta.size], tpu)?;
}
}
Ok(())
}
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &UnprocessedPackets,
) -> bool {
let (leader, my_id) = {
let rcluster_info = cluster_info.read().unwrap();
let leader_id = if let Some(leader) = rcluster_info.leader_data() {
Some(leader.clone())
} else {
None
};
(leader_id, rcluster_info.id())
};
let bank = poh_recorder.lock().unwrap().bank();
// if the current node is not the leader, forward the buffered packets
if bank.is_none() {
if let Some(leader) = leader.clone() {
if my_id == leader.id {
let _ =
Self::forward_unprocessed_packets(&socket, &leader.tpu, &buffered_packets);
return true;
}
}
}
// If there's a bank, and leader is available, forward the packets
if bank.is_some() && leader.is_some() {
let _ =
Self::forward_unprocessed_packets(&socket, &leader.unwrap().tpu, &buffered_packets);
return true;
}
return false;
}
fn should_buffer_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> bool {
let bank = poh_recorder.lock().unwrap().bank();
let rcluster_info = cluster_info.read().unwrap();
let my_id = rcluster_info.id();
let leader = rcluster_info.leader_data();
// Buffer the packets if it was getting sent to me
if bank.is_none() && leader.is_some() && my_id == leader.unwrap().id {
return true;
}
if let Some(bank) = bank {
// Buffer the packets if I am the next leader
if leader_schedule_utils::slot_leader_at(bank.slot() + 1, &bank).unwrap() == my_id {
return true;
}
}
return false;
}
pub fn process_loop(
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![];
loop {
if Self::forward_buffered_packets(
&socket,
poh_recorder,
cluster_info,
&buffered_packets,
) {
buffered_packets.clear();
}
match Self::process_packets(&verified_receiver, &poh_recorder) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(unprocessed_packets) => {
if Self::should_buffer_packets(poh_recorder, cluster_info) {
buffered_packets.extend_from_slice(&unprocessed_packets);
continue;
}
if let Some(leader) = cluster_info.read().unwrap().leader_data() {
let _ = Self::forward_unprocessed_packets(
&socket,
&leader.tpu,
unprocessed_packets,
&unprocessed_packets,
);
}
}