diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index f4aaf54b76..e6b7694461 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -35,9 +35,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { cluster_info.insert_info(contact_info); stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } - let stakes = Arc::new(stakes); let cluster_info = Arc::new(cluster_info); - let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes)); + let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes)); let shreds = Arc::new(shreds); let last_datapoint = Arc::new(AtomicU64::new(0)); bencher.iter(move || { diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 012a92c7a8..8c182e057f 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -359,9 +359,9 @@ fn update_peer_stats( } } -pub fn get_broadcast_peers( +pub fn get_broadcast_peers( cluster_info: &ClusterInfo, - stakes: Option>>, + stakes: Option<&HashMap>, ) -> (Vec, Vec<(u64, usize)>) { use crate::cluster_info; let mut peers = cluster_info.tvu_peers(); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index fcfd39104b..17d34e99a1 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -135,7 +135,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) -> Result<()> { let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; // Broadcast data - let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); + let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes.as_deref()); broadcast_shreds( sock, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 2b1676aff0..d91da296e4 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -319,7 +319,7 @@ impl StandardBroadcastRun { &mut self, sock: &UdpSocket, cluster_info: &ClusterInfo, - stakes: Option>>, + stakes: Option<&HashMap>, shreds: Arc>, broadcast_shred_batch_info: Option, ) -> Result<()> { @@ -432,7 +432,7 @@ impl BroadcastRun for StandardBroadcastRun { sock: &UdpSocket, ) -> Result<()> { let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; - self.broadcast(sock, cluster_info, stakes, shreds, slot_start_ts) + self.broadcast(sock, cluster_info, stakes.as_deref(), shreds, slot_start_ts) } fn record( &mut self, diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 00ea0b5176..dacddc70b2 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1374,9 +1374,9 @@ impl ClusterInfo { || !ContactInfo::is_valid_address(&contact_info.tvu) } - fn sorted_stakes_with_index( + fn sorted_stakes_with_index( peers: &[ContactInfo], - stakes: Option>>, + stakes: Option<&HashMap>, ) -> Vec<(u64, usize)> { let stakes_and_index: Vec<_> = peers .iter() @@ -1417,7 +1417,7 @@ impl ClusterInfo { // Return sorted_retransmit_peers(including self) and their stakes pub fn sorted_retransmit_peers_and_stakes( &self, - stakes: Option>>, + stakes: Option<&HashMap>, ) -> (Vec, Vec<(u64, usize)>) { let mut peers = self.retransmit_peers(); // insert "self" into this list for the layer and neighborhood computation @@ -3183,9 +3183,9 @@ impl Node { } } -pub fn stake_weight_peers( +pub fn stake_weight_peers( peers: &mut Vec, - stakes: Option>>, + stakes: Option<&HashMap>, ) -> Vec<(u64, usize)> { peers.dedup(); ClusterInfo::sorted_stakes_with_index(peers, stakes) @@ -4066,9 +4066,8 @@ mod tests { cluster_info.insert_info(contact_info); stakes.insert(id4, 10); - let stakes = Arc::new(stakes); let mut peers = cluster_info.tvu_peers(); - let peers_and_stakes = stake_weight_peers(&mut peers, Some(stakes)); + let peers_and_stakes = stake_weight_peers(&mut peers, Some(&stakes)); assert_eq!(peers.len(), 2); assert_eq!(peers[0].id, id); assert_eq!(peers[1].id, id2); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 9977f7c18d..fc19084c09 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -25,11 +25,7 @@ use solana_metrics::inc_new_counter_error; use solana_perf::packet::{Packet, Packets}; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{ - clock::{Epoch, Slot}, - epoch_schedule::EpochSchedule, - feature_set, - pubkey::Pubkey, - timing::timestamp, + clock::Slot, epoch_schedule::EpochSchedule, feature_set, pubkey::Pubkey, timing::timestamp, }; use solana_streamer::streamer::PacketReceiver; use std::{ @@ -202,8 +198,6 @@ fn update_retransmit_stats( #[derive(Default)] struct EpochStakesCache { - epoch: Epoch, - stakes: Option>>, peers: Vec, stakes_and_index: Vec<(u64, usize)>, } @@ -295,40 +289,27 @@ fn retransmit( epoch_fetch.stop(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); - let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); - if r_epoch_stakes_cache.epoch != bank_epoch { - drop(r_epoch_stakes_cache); - let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); - if w_epoch_stakes_cache.epoch != bank_epoch { - let stakes = r_bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); - w_epoch_stakes_cache.stakes = stakes; - w_epoch_stakes_cache.epoch = bank_epoch; - } - drop(w_epoch_stakes_cache); - r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); - } - let now = timestamp(); let last = last_peer_update.load(Ordering::Relaxed); #[allow(deprecated)] if now.saturating_sub(last) > 1000 && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last { - drop(r_epoch_stakes_cache); - let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); + let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); let (peers, stakes_and_index) = - cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone()); - w_epoch_stakes_cache.peers = peers; - w_epoch_stakes_cache.stakes_and_index = stakes_and_index; - drop(w_epoch_stakes_cache); - r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + cluster_info.sorted_retransmit_peers_and_stakes(epoch_staked_nodes.as_ref()); + { + let mut epoch_stakes_cache = epoch_stakes_cache.write().unwrap(); + epoch_stakes_cache.peers = peers; + epoch_stakes_cache.stakes_and_index = stakes_and_index; + } { let mut sr = shreds_received.lock().unwrap(); sr.0.clear(); sr.1.reset(); } } + let r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); let mut peers_len = 0; epoch_cache_update.stop(); diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index f095cbfa6a..fcd40e6fbc 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -108,14 +108,13 @@ fn run_simulation(stakes: &[u64], fanout: usize) { }); let c_info = cluster_info.clone_with_id(&cluster_info.id()); - let staked_nodes = Arc::new(staked_nodes); let shreds_len = 100; let shuffled_peers: Vec> = (0..shreds_len as i32) .map(|i| { let mut seed = [0; 32]; seed[0..4].copy_from_slice(&i.to_le_bytes()); let (peers, stakes_and_index) = - cluster_info.sorted_retransmit_peers_and_stakes(Some(staked_nodes.clone())); + cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes)); let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index( &cluster_info.id(), &peers,