address review comments

This commit is contained in:
Pankaj Garg 2019-03-11 11:04:40 -07:00
parent a4acc631ee
commit 55f660d5f9
1 changed files with 32 additions and 49 deletions

View File

@ -17,6 +17,7 @@ use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize;
use solana_metrics::counter::Counter;
use solana_runtime::bank::{self, Bank, BankError};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES};
use solana_sdk::transaction::Transaction;
use std::net::UdpSocket;
@ -103,67 +104,49 @@ impl BankingStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &[(SharedPackets, usize)],
) -> bool {
let (leader, my_id) = {
let rcluster_info = cluster_info.read().unwrap();
let leader_id = if let Some(leader) = rcluster_info.leader_data() {
Some(leader.clone())
} else {
None
};
(leader_id, rcluster_info.id())
};
let bank = poh_recorder.lock().unwrap().bank();
// if the current node is not the leader, forward the buffered packets
if bank.is_none() {
if let Some(leader) = leader.clone() {
if my_id == leader.id {
let _ = Self::forward_unprocessed_packets(
&socket,
&leader.forwarder,
&buffered_packets,
);
return true;
}
}
}
let rcluster_info = cluster_info.read().unwrap();
// If there's a bank, and leader is available, forward the packets
if bank.is_some() && leader.is_some() {
let _ = Self::forward_unprocessed_packets(
&socket,
&leader.unwrap().forwarder,
&buffered_packets,
);
return true;
}
// or, if the current node is not the leader, forward the buffered packets
let leader_id = match poh_recorder.lock().unwrap().bank() {
None => rcluster_info.leader_data().map(|x| x.id),
Some(_) => rcluster_info.leader_data().map(|_| Pubkey::default()), // returning default pubkey on purpose
};
false
match leader_id {
Some(leader) => {
if leader != rcluster_info.id() {
let _ = Self::forward_unprocessed_packets(
&socket,
&rcluster_info.leader_data().unwrap().forwarder,
&buffered_packets,
);
true
} else {
false
}
}
None => false,
}
}
fn should_buffer_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> bool {
let bank = poh_recorder.lock().unwrap().bank();
let rcluster_info = cluster_info.read().unwrap();
let my_id = rcluster_info.id();
let leader = rcluster_info.leader_data();
// Buffer the packets if it was getting sent to me
if bank.is_none() && leader.is_some() && my_id == leader.unwrap().id {
return true;
}
// Buffer the packets if I am the next leader
// or, if it was getting sent to me
let leader_id = match poh_recorder.lock().unwrap().bank() {
None => rcluster_info
.leader_data()
.map(|x| x.id)
.unwrap_or_default(),
Some(bank) => leader_schedule_utils::slot_leader_at(bank.slot() + 1, &bank).unwrap(),
};
if let Some(bank) = bank {
// Buffer the packets if I am the next leader
if leader_schedule_utils::slot_leader_at(bank.slot() + 1, &bank).unwrap() == my_id {
return true;
}
}
false
leader_id == rcluster_info.id()
}
pub fn process_loop(