remove tick_count, leader_scheduler, from broadcast code (#1725)
This commit is contained in:
parent
76694bfcf4
commit
9b43b00d5c
|
@ -6,7 +6,6 @@ use entry::Entry;
|
|||
#[cfg(feature = "erasure")]
|
||||
use erasure;
|
||||
use influx_db_client as influxdb;
|
||||
use leader_scheduler::LeaderScheduler;
|
||||
use ledger::Block;
|
||||
use log::Level;
|
||||
use metrics;
|
||||
|
@ -29,10 +28,7 @@ pub enum BroadcastStageReturnType {
|
|||
ChannelDisconnected,
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
fn broadcast(
|
||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||
mut tick_height: u64,
|
||||
node_info: &NodeInfo,
|
||||
broadcast_table: &[NodeInfo],
|
||||
window: &SharedWindow,
|
||||
|
@ -50,9 +46,6 @@ fn broadcast(
|
|||
ventries.push(entries);
|
||||
while let Ok(entries) = receiver.try_recv() {
|
||||
num_entries += entries.len();
|
||||
tick_height += entries
|
||||
.iter()
|
||||
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
|
||||
ventries.push(entries);
|
||||
}
|
||||
inc_new_counter_info!("broadcast_stage-entries_received", num_entries);
|
||||
|
@ -137,8 +130,6 @@ fn broadcast(
|
|||
|
||||
// Send blobs out from the window
|
||||
ClusterInfo::broadcast(
|
||||
&leader_scheduler,
|
||||
tick_height,
|
||||
&node_info,
|
||||
&broadcast_table,
|
||||
&window,
|
||||
|
@ -198,8 +189,6 @@ impl BroadcastStage {
|
|||
window: &SharedWindow,
|
||||
entry_height: u64,
|
||||
receiver: &Receiver<Vec<Entry>>,
|
||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||
tick_height: u64,
|
||||
) -> BroadcastStageReturnType {
|
||||
let mut transmit_index = WindowIndex {
|
||||
data: entry_height,
|
||||
|
@ -210,8 +199,6 @@ impl BroadcastStage {
|
|||
loop {
|
||||
let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
|
||||
if let Err(e) = broadcast(
|
||||
leader_scheduler,
|
||||
tick_height,
|
||||
&me,
|
||||
&broadcast_table,
|
||||
&window,
|
||||
|
@ -256,23 +243,13 @@ impl BroadcastStage {
|
|||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
receiver: Receiver<Vec<Entry>>,
|
||||
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
||||
tick_height: u64,
|
||||
exit_sender: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-broadcaster".to_string())
|
||||
.spawn(move || {
|
||||
let _exit = Finalizer::new(exit_sender);
|
||||
Self::run(
|
||||
&sock,
|
||||
&cluster_info,
|
||||
&window,
|
||||
entry_height,
|
||||
&receiver,
|
||||
&leader_scheduler,
|
||||
tick_height,
|
||||
)
|
||||
Self::run(&sock, &cluster_info, &window, entry_height, &receiver)
|
||||
}).unwrap();
|
||||
|
||||
BroadcastStage { thread_hdl }
|
||||
|
|
|
@ -16,7 +16,6 @@ use bincode::{deserialize, serialize, serialized_size};
|
|||
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
|
||||
use counter::Counter;
|
||||
use hash::Hash;
|
||||
use leader_scheduler::LeaderScheduler;
|
||||
use ledger::LedgerWindow;
|
||||
use log::Level;
|
||||
use netutil::{bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range};
|
||||
|
@ -461,10 +460,7 @@ impl ClusterInfo {
|
|||
/// broadcast messages from the leader to layer 1 nodes
|
||||
/// # Remarks
|
||||
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
pub fn broadcast(
|
||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||
tick_height: u64,
|
||||
me: &NodeInfo,
|
||||
broadcast_table: &[NodeInfo],
|
||||
window: &SharedWindow,
|
||||
|
@ -506,35 +502,6 @@ impl ClusterInfo {
|
|||
br_idx
|
||||
);
|
||||
|
||||
// Make sure the next leader in line knows about the entries before his slot in the leader
|
||||
// rotation so they can initiate repairs if necessary
|
||||
{
|
||||
let ls_lock = leader_scheduler.read().unwrap();
|
||||
let next_leader_height = ls_lock.max_height_for_leader(tick_height);
|
||||
let next_leader_id =
|
||||
next_leader_height.map(|nlh| ls_lock.get_scheduled_leader(nlh));
|
||||
// In the case the next scheduled leader is None, then the write_stage moved
|
||||
// the schedule too far ahead and we no longer are in the known window
|
||||
// (will happen during calculation of the next set of slots every epoch or
|
||||
// seed_rotation_interval heights when we move the window forward in the
|
||||
// LeaderScheduler). For correctness, this is fine write_stage will never send
|
||||
// blobs past the point of when this node should stop being leader, so we just
|
||||
// continue broadcasting until we catch up to write_stage. The downside is we
|
||||
// can't guarantee the current leader will broadcast the last entry to the next
|
||||
// scheduled leader, so the next leader will have to rely on avalanche/repairs
|
||||
// to get this last blob, which could cause slowdowns during leader handoffs.
|
||||
// See corresponding issue for repairs in repair() function in window.rs.
|
||||
if let Some(Some(next_leader_id)) = next_leader_id {
|
||||
if next_leader_id == me.id {
|
||||
break;
|
||||
}
|
||||
let info_result = broadcast_table.iter().position(|n| n.id == next_leader_id);
|
||||
if let Some(index) = info_result {
|
||||
orders.push((window_l[w_idx].data.clone(), &broadcast_table[index]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx]));
|
||||
br_idx += 1;
|
||||
br_idx %= broadcast_table.len();
|
||||
|
|
|
@ -346,8 +346,6 @@ impl Fullnode {
|
|||
shared_window.clone(),
|
||||
entry_height,
|
||||
entry_receiver,
|
||||
bank.leader_scheduler.clone(),
|
||||
bank.tick_height(),
|
||||
tpu_exit,
|
||||
);
|
||||
let leader_state = LeaderServices::new(tpu, broadcast_stage);
|
||||
|
@ -498,8 +496,6 @@ impl Fullnode {
|
|||
self.shared_window.clone(),
|
||||
entry_height,
|
||||
blob_receiver,
|
||||
self.bank.leader_scheduler.clone(),
|
||||
tick_height,
|
||||
tpu_exit,
|
||||
);
|
||||
let leader_state = LeaderServices::new(tpu, broadcast_stage);
|
||||
|
|
|
@ -54,7 +54,6 @@ pub struct Tpu {
|
|||
}
|
||||
|
||||
impl Tpu {
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
pub fn new(
|
||||
bank: &Arc<Bank>,
|
||||
tick_duration: Config,
|
||||
|
|
|
@ -52,7 +52,6 @@ pub trait WindowUtil {
|
|||
|
||||
fn window_size(&self) -> u64;
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
fn repair(
|
||||
&mut self,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
|
@ -67,7 +66,6 @@ pub trait WindowUtil {
|
|||
|
||||
fn print(&self, id: &Pubkey, consumed: u64) -> String;
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
fn process_blob(
|
||||
&mut self,
|
||||
id: &Pubkey,
|
||||
|
|
Loading…
Reference in New Issue