removes redundant epoch stakes cache in retransmit (#14781)

Following d6d76219b, staked nodes computed from vote accounts are
already cached in runtime::Stakes, so the caching in retransmit_stage is
redundant.
This commit is contained in:
behzad nouri 2021-01-24 21:15:09 +00:00 committed by GitHub
parent 0d32a0e0f4
commit e1021d9f83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 22 additions and 44 deletions

View File

@ -35,9 +35,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
cluster_info.insert_info(contact_info);
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
}
let stakes = Arc::new(stakes);
let cluster_info = Arc::new(cluster_info);
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes));
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes));
let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicU64::new(0));
bencher.iter(move || {

View File

@ -359,9 +359,9 @@ fn update_peer_stats(
}
}
pub fn get_broadcast_peers<S: std::hash::BuildHasher>(
pub fn get_broadcast_peers(
cluster_info: &ClusterInfo,
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
use crate::cluster_info;
let mut peers = cluster_info.tvu_peers();

View File

@ -135,7 +135,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
// Broadcast data
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes.as_deref());
broadcast_shreds(
sock,

View File

@ -319,7 +319,7 @@ impl StandardBroadcastRun {
&mut self,
sock: &UdpSocket,
cluster_info: &ClusterInfo,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
stakes: Option<&HashMap<Pubkey, u64>>,
shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
) -> Result<()> {
@ -432,7 +432,7 @@ impl BroadcastRun for StandardBroadcastRun {
sock: &UdpSocket,
) -> Result<()> {
let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
self.broadcast(sock, cluster_info, stakes, shreds, slot_start_ts)
self.broadcast(sock, cluster_info, stakes.as_deref(), shreds, slot_start_ts)
}
fn record(
&mut self,

View File

@ -1374,9 +1374,9 @@ impl ClusterInfo {
|| !ContactInfo::is_valid_address(&contact_info.tvu)
}
fn sorted_stakes_with_index<S: std::hash::BuildHasher>(
fn sorted_stakes_with_index(
peers: &[ContactInfo],
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Vec<(u64, usize)> {
let stakes_and_index: Vec<_> = peers
.iter()
@ -1417,7 +1417,7 @@ impl ClusterInfo {
// Return sorted_retransmit_peers(including self) and their stakes
pub fn sorted_retransmit_peers_and_stakes(
&self,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = self.retransmit_peers();
// insert "self" into this list for the layer and neighborhood computation
@ -3183,9 +3183,9 @@ impl Node {
}
}
pub fn stake_weight_peers<S: std::hash::BuildHasher>(
pub fn stake_weight_peers(
peers: &mut Vec<ContactInfo>,
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Vec<(u64, usize)> {
peers.dedup();
ClusterInfo::sorted_stakes_with_index(peers, stakes)
@ -4066,9 +4066,8 @@ mod tests {
cluster_info.insert_info(contact_info);
stakes.insert(id4, 10);
let stakes = Arc::new(stakes);
let mut peers = cluster_info.tvu_peers();
let peers_and_stakes = stake_weight_peers(&mut peers, Some(stakes));
let peers_and_stakes = stake_weight_peers(&mut peers, Some(&stakes));
assert_eq!(peers.len(), 2);
assert_eq!(peers[0].id, id);
assert_eq!(peers[1].id, id2);

View File

@ -25,11 +25,7 @@ use solana_metrics::inc_new_counter_error;
use solana_perf::packet::{Packet, Packets};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{
clock::{Epoch, Slot},
epoch_schedule::EpochSchedule,
feature_set,
pubkey::Pubkey,
timing::timestamp,
clock::Slot, epoch_schedule::EpochSchedule, feature_set, pubkey::Pubkey, timing::timestamp,
};
use solana_streamer::streamer::PacketReceiver;
use std::{
@ -202,8 +198,6 @@ fn update_retransmit_stats(
#[derive(Default)]
struct EpochStakesCache {
epoch: Epoch,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
peers: Vec<ContactInfo>,
stakes_and_index: Vec<(u64, usize)>,
}
@ -295,40 +289,27 @@ fn retransmit(
epoch_fetch.stop();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
if r_epoch_stakes_cache.epoch != bank_epoch {
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
if w_epoch_stakes_cache.epoch != bank_epoch {
let stakes = r_bank.epoch_staked_nodes(bank_epoch);
let stakes = stakes.map(Arc::new);
w_epoch_stakes_cache.stakes = stakes;
w_epoch_stakes_cache.epoch = bank_epoch;
}
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
}
let now = timestamp();
let last = last_peer_update.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch);
let (peers, stakes_and_index) =
cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone());
w_epoch_stakes_cache.peers = peers;
w_epoch_stakes_cache.stakes_and_index = stakes_and_index;
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
cluster_info.sorted_retransmit_peers_and_stakes(epoch_staked_nodes.as_ref());
{
let mut epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
epoch_stakes_cache.peers = peers;
epoch_stakes_cache.stakes_and_index = stakes_and_index;
}
{
let mut sr = shreds_received.lock().unwrap();
sr.0.clear();
sr.1.reset();
}
}
let r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
let mut peers_len = 0;
epoch_cache_update.stop();

View File

@ -108,14 +108,13 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
});
let c_info = cluster_info.clone_with_id(&cluster_info.id());
let staked_nodes = Arc::new(staked_nodes);
let shreds_len = 100;
let shuffled_peers: Vec<Vec<ContactInfo>> = (0..shreds_len as i32)
.map(|i| {
let mut seed = [0; 32];
seed[0..4].copy_from_slice(&i.to_le_bytes());
let (peers, stakes_and_index) =
cluster_info.sorted_retransmit_peers_and_stakes(Some(staked_nodes.clone()));
cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes));
let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index(
&cluster_info.id(),
&peers,