diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 9f2c2b3f6e..15c4c815d4 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -44,6 +44,9 @@ pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; +const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; +const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); + pub(crate) const NUM_INSERT_THREADS: usize = 2; pub(crate) type RetransmitSlotsSender = CrossbeamSender>>; pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver>>; diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 8e7ba4a479..a593973a5e 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -1,11 +1,14 @@ -use super::*; -use solana_entry::entry::Entry; -use solana_ledger::shred::Shredder; -use solana_runtime::blockhash_queue::BlockhashQueue; -use solana_sdk::{ - hash::Hash, - signature::{Keypair, Signer}, - system_transaction, +use { + super::*, + crate::cluster_nodes::ClusterNodesCache, + solana_entry::entry::Entry, + solana_ledger::shred::Shredder, + solana_runtime::blockhash_queue::BlockhashQueue, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signer}, + system_transaction, + }, }; pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; @@ -32,10 +35,15 @@ pub(super) struct BroadcastDuplicatesRun { recent_blockhash: Option, prev_entry_hash: Option, num_slots_broadcasted: usize, + cluster_nodes_cache: Arc>, } impl BroadcastDuplicatesRun { pub(super) fn new(shred_version: u16, config: BroadcastDuplicatesConfig) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { config, duplicate_queue: BlockhashQueue::default(), @@ -47,6 +55,7 @@ impl BroadcastDuplicatesRun { recent_blockhash: None, prev_entry_hash: None, num_slots_broadcasted: 0, + cluster_nodes_cache, } } } @@ -220,11 +229,8 @@ impl BroadcastRun for BroadcastDuplicatesRun { ) -> Result<()> { let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let root_bank = bank_forks.read().unwrap().root_bank(); - let epoch = root_bank.get_leader_schedule_epoch(slot); - let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = - ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); + let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 39d92af654..978421d33d 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,9 +1,10 @@ -use super::*; -use crate::cluster_nodes::ClusterNodes; -use solana_ledger::shred::Shredder; -use solana_sdk::hash::Hash; -use solana_sdk::signature::Keypair; -use std::{thread::sleep, time::Duration}; +use { + super::*, + crate::cluster_nodes::ClusterNodesCache, + solana_ledger::shred::Shredder, + solana_sdk::{hash::Hash, signature::Keypair}, + std::{thread::sleep, time::Duration}, +}; pub const NUM_BAD_SLOTS: u64 = 10; pub const SLOT_TO_RESOLVE: u64 = 32; @@ -14,15 +15,21 @@ pub(super) struct FailEntryVerificationBroadcastRun { good_shreds: Vec, current_slot: Slot, next_shred_index: u32, + cluster_nodes_cache: Arc>, } impl FailEntryVerificationBroadcastRun { pub(super) fn new(shred_version: u16) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { shred_version, good_shreds: vec![], current_slot: 0, next_shred_index: 0, + cluster_nodes_cache, } } } @@ -132,11 +139,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) -> Result<()> { let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let root_bank = bank_forks.read().unwrap().root_bank(); - let epoch = root_bank.get_leader_schedule_epoch(slot); - let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = - ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); + let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index db02a0b3e4..7688abefcc 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -5,7 +5,9 @@ use { broadcast_utils::{self, ReceiveResults}, *, }, - crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}, + crate::{ + broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache, + }, solana_entry::entry::Entry, solana_ledger::shred::{ ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, @@ -29,12 +31,16 @@ pub struct StandardBroadcastRun { shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, - cluster_nodes: Arc>>, + cluster_nodes_cache: Arc>, last_peer_update: Arc, } impl StandardBroadcastRun { pub(super) fn new(shred_version: u16) -> Self { + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); Self { process_shreds_stats: ProcessShredsStats::default(), transmit_shreds_stats: Arc::default(), @@ -45,7 +51,7 @@ impl StandardBroadcastRun { shred_version, last_datapoint_submit: Arc::default(), num_batches: 0, - cluster_nodes: Arc::default(), + cluster_nodes_cache, last_peer_update: Arc::new(AtomicInterval::default()), } } @@ -342,12 +348,8 @@ impl StandardBroadcastRun { // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); let root_bank = bank_forks.read().unwrap().root_bank(); - let epoch = root_bank.get_leader_schedule_epoch(slot); - let stakes = root_bank.epoch_staked_nodes(epoch); - *self.cluster_nodes.write().unwrap() = - ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); + let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); get_peers_time.stop(); - let cluster_nodes = self.cluster_nodes.read().unwrap(); let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds @@ -363,7 +365,6 @@ impl StandardBroadcastRun { bank_forks, cluster_info.socket_addr_space(), )?; - drop(cluster_nodes); transmit_time.stop(); transmit_stats.transmit_elapsed = transmit_time.as_us();