Use Shared IP to Stake Map (#25377)

* Find packet sender stake stage use shared IP to stake map
This commit is contained in:
Brennan Watt 2022-05-20 12:51:07 -07:00 committed by GitHub
parent cfcc18f7f4
commit 2fdc850176
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 92 deletions

View File

@ -2,11 +2,9 @@ use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank_forks::BankForks,
solana_sdk::timing::timestamp,
solana_streamer::streamer::{self, StreamerError},
std::{
@ -14,12 +12,9 @@ use {
net::IpAddr,
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
};
const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5);
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
@ -82,92 +77,58 @@ impl FindPacketSenderStakeStage {
pub fn new(
packet_receiver: streamer::PacketBatchReceiver,
sender: FindPacketSenderStakeSender,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
name: &'static str,
) -> Self {
let mut stats = FindPacketSenderStakeStats::default();
let thread_hdl = Builder::new()
.name("find-packet-sender-stake".to_string())
.spawn(move || {
let mut last_stakes = Instant::now();
let mut ip_to_stake: HashMap<IpAddr, u64> = HashMap::new();
loop {
let mut refresh_ip_to_stake_time = Measure::start("refresh_ip_to_stake_time");
Self::try_refresh_ip_to_stake(
&mut last_stakes,
&mut ip_to_stake,
bank_forks.clone(),
cluster_info.clone(),
);
refresh_ip_to_stake_time.stop();
stats.refresh_ip_to_stake_time = stats
.refresh_ip_to_stake_time
.saturating_add(refresh_ip_to_stake_time.as_us());
match streamer::recv_packet_batches(&packet_receiver) {
Ok((mut batches, num_packets, recv_duration)) => {
let num_batches = batches.len();
let mut apply_sender_stakes_time =
Measure::start("apply_sender_stakes_time");
.spawn(move || loop {
match streamer::recv_packet_batches(&packet_receiver) {
Ok((mut batches, num_packets, recv_duration)) => {
let num_batches = batches.len();
let mut apply_sender_stakes_time =
Measure::start("apply_sender_stakes_time");
let mut apply_stake = || {
let ip_to_stake = staked_nodes.read().unwrap();
Self::apply_sender_stakes(&mut batches, &ip_to_stake);
apply_sender_stakes_time.stop();
};
apply_stake();
apply_sender_stakes_time.stop();
let mut send_batches_time = Measure::start("send_batches_time");
if let Err(e) = sender.send(batches) {
info!("Sender error: {:?}", e);
}
send_batches_time.stop();
stats.apply_sender_stakes_time = stats
.apply_sender_stakes_time
.saturating_add(apply_sender_stakes_time.as_us());
stats.send_batches_time = stats
.send_batches_time
.saturating_add(send_batches_time.as_us());
stats.receive_batches_time = stats
.receive_batches_time
.saturating_add(recv_duration.as_nanos() as u64);
stats.total_batches =
stats.total_batches.saturating_add(num_batches as u64);
stats.total_packets =
stats.total_packets.saturating_add(num_packets as u64);
let mut send_batches_time = Measure::start("send_batches_time");
if let Err(e) = sender.send(batches) {
info!("Sender error: {:?}", e);
}
Err(e) => match e {
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
_ => error!("error: {:?}", e),
},
}
send_batches_time.stop();
stats.report(name);
stats.apply_sender_stakes_time = stats
.apply_sender_stakes_time
.saturating_add(apply_sender_stakes_time.as_us());
stats.send_batches_time = stats
.send_batches_time
.saturating_add(send_batches_time.as_us());
stats.receive_batches_time = stats
.receive_batches_time
.saturating_add(recv_duration.as_nanos() as u64);
stats.total_batches =
stats.total_batches.saturating_add(num_batches as u64);
stats.total_packets =
stats.total_packets.saturating_add(num_packets as u64);
}
Err(e) => match e {
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
_ => error!("error: {:?}", e),
},
}
stats.report(name);
})
.unwrap();
Self { thread_hdl }
}
fn try_refresh_ip_to_stake(
last_stakes: &mut Instant,
ip_to_stake: &mut HashMap<IpAddr, u64>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
) {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
*ip_to_stake = cluster_info
.tvu_peers()
.into_iter()
.filter_map(|node| {
let stake = staked_nodes.get(&node.id)?;
Some((node.tvu.ip(), *stake))
})
.collect();
*last_stakes = Instant::now();
}
}
fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
PAR_THREAD_POOL.install(|| {
batches

View File

@ -32,14 +32,15 @@ impl StakedNodesUpdaterService {
let mut last_stakes = Instant::now();
while !exit.load(Ordering::Relaxed) {
let mut new_ip_to_stake = HashMap::new();
Self::try_refresh_ip_to_stake(
if Self::try_refresh_ip_to_stake(
&mut last_stakes,
&mut new_ip_to_stake,
&bank_forks,
&cluster_info,
);
let mut shared = shared_staked_nodes.write().unwrap();
*shared = new_ip_to_stake;
) {
let mut shared = shared_staked_nodes.write().unwrap();
*shared = new_ip_to_stake;
}
}
})
.unwrap();
@ -52,7 +53,7 @@ impl StakedNodesUpdaterService {
ip_to_stake: &mut HashMap<IpAddr, u64>,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
) {
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
@ -65,8 +66,10 @@ impl StakedNodesUpdaterService {
})
.collect();
*last_stakes = Instant::now();
true
} else {
sleep(Duration::from_millis(1));
false
}
}

View File

@ -116,13 +116,20 @@ impl Tpu {
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
);
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes_updater_service = StakedNodesUpdaterService::new(
exit.clone(),
cluster_info.clone(),
bank_forks.clone(),
staked_nodes.clone(),
);
let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded();
let find_packet_sender_stake_stage = FindPacketSenderStakeStage::new(
packet_receiver,
find_packet_sender_stake_sender,
bank_forks.clone(),
cluster_info.clone(),
staked_nodes.clone(),
"tpu-find-packet-sender-stake",
);
@ -132,20 +139,12 @@ impl Tpu {
let vote_find_packet_sender_stake_stage = FindPacketSenderStakeStage::new(
vote_packet_receiver,
vote_find_packet_sender_stake_sender,
bank_forks.clone(),
cluster_info.clone(),
staked_nodes.clone(),
"tpu-vote-find-packet-sender-stake",
);
let (verified_sender, verified_receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes_updater_service = StakedNodesUpdaterService::new(
exit.clone(),
cluster_info.clone(),
bank_forks.clone(),
staked_nodes.clone(),
);
let tpu_quic_t = spawn_server(
transactions_quic_sockets,
keypair,