From 30bec3921ef2d48f26223f83e00fd775fa4f9e67 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 28 Jul 2021 11:52:39 -0400 Subject: [PATCH] uses cluster-nodes cache in retransmit stage The new cluster-nodes cache will: * ensure cluster-nodes are recalculated if the epoch (and so the epoch staked nodes) changes. * encapsulate time-to-live eviction policy. --- core/src/retransmit_stage.rs | 78 ++++++++++++++---------------------- 1 file changed, 31 insertions(+), 47 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index ce929e794f..e659236cf6 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -5,7 +5,7 @@ use { crate::{ ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, cluster_info_vote_listener::VerifiedVoteReceiver, - cluster_nodes::ClusterNodes, + cluster_nodes::ClusterNodesCache, cluster_slots::ClusterSlots, cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, completed_data_sets_service::CompletedDataSetsSender, @@ -28,7 +28,7 @@ use { solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - clock::{Epoch, Slot}, + clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::{timestamp, AtomicInterval}, @@ -58,6 +58,9 @@ const DEFAULT_LRU_SIZE: usize = 10_000; // it doesn't pull up too much work. const MAX_PACKET_BATCH_SIZE: usize = 100; +const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; +const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); + #[derive(Default)] struct RetransmitStats { total_packets: AtomicU64, @@ -275,36 +278,23 @@ fn check_if_first_shred_received( } } -fn maybe_update_peers_cache( - cluster_nodes: &RwLock>, +fn maybe_reset_shreds_received_cache( shreds_received: &Mutex, - last_peer_update: &AtomicU64, - cluster_info: &ClusterInfo, - bank_epoch: Epoch, - working_bank: &Bank, + hasher_reset_ts: &AtomicU64, ) { const UPDATE_INTERVAL_MS: u64 = 1000; - if timestamp().saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS { - return; - } + let now = timestamp(); + let prev = hasher_reset_ts.load(Ordering::Acquire); + if now.saturating_sub(prev) > UPDATE_INTERVAL_MS + && hasher_reset_ts + .compare_exchange(prev, now, Ordering::AcqRel, Ordering::Acquire) + .is_ok() { - // Write-lock cluster-nodes here so that only one thread does the - // computations to update peers. - let mut cluster_nodes = cluster_nodes.write().unwrap(); - let now = timestamp(); - if now.saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS { - return; // Some other thread has already done the update. - } - let epoch_staked_nodes = working_bank - .epoch_staked_nodes(bank_epoch) - .unwrap_or_default(); - *cluster_nodes = ClusterNodes::::new(cluster_info, &epoch_staked_nodes); - last_peer_update.store(now, Ordering::Release); + let mut shreds_received = shreds_received.lock().unwrap(); + let (cache, hasher) = shreds_received.deref_mut(); + cache.clear(); + hasher.reset(); } - let mut shreds_received = shreds_received.lock().unwrap(); - let (cache, hasher) = shreds_received.deref_mut(); - cache.clear(); - hasher.reset(); } #[allow(clippy::too_many_arguments)] @@ -316,8 +306,8 @@ fn retransmit( sock: &UdpSocket, id: u32, stats: &RetransmitStats, - cluster_nodes: &RwLock>, - last_peer_update: &AtomicU64, + cluster_nodes_cache: &ClusterNodesCache, + hasher_reset_ts: &AtomicU64, shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, @@ -343,20 +333,11 @@ fn retransmit( let bank_forks = bank_forks.read().unwrap(); (bank_forks.working_bank(), bank_forks.root_bank()) }; - let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot()); epoch_fetch.stop(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); - maybe_update_peers_cache( - cluster_nodes, - shreds_received, - last_peer_update, - cluster_info, - bank_epoch, - &working_bank, - ); - let cluster_nodes = cluster_nodes.read().unwrap(); - let peers_len = cluster_nodes.num_peers(); + maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); + let cluster_nodes = cluster_nodes_cache.get(cluster_info, &working_bank); epoch_cache_update.stop(); let my_id = cluster_info.id(); @@ -459,7 +440,7 @@ fn retransmit( discard_total, repair_total, compute_turbine_peers_total, - peers_len, + cluster_nodes.num_peers(), packets_by_slot, packets_by_source, epoch_fetch.as_us(), @@ -487,8 +468,11 @@ pub fn retransmitter( max_slots: Arc, rpc_subscriptions: Option>, ) -> Vec> { - let cluster_nodes = Arc::default(); - let last_peer_update = Arc::default(); + let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, + )); + let hasher_reset_ts = Arc::default(); let stats = Arc::new(RetransmitStats::default()); let shreds_received = Arc::new(Mutex::new(( LruCache::new(DEFAULT_LRU_SIZE), @@ -503,8 +487,8 @@ pub fn retransmitter( let r = r.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); - let cluster_nodes = Arc::clone(&cluster_nodes); - let last_peer_update = Arc::clone(&last_peer_update); + let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache); + let hasher_reset_ts = Arc::clone(&hasher_reset_ts); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); let first_shreds_received = first_shreds_received.clone(); @@ -523,8 +507,8 @@ pub fn retransmitter( &sockets[s], s as u32, &stats, - &cluster_nodes, - &last_peer_update, + &cluster_nodes_cache, + &hasher_reset_ts, &shreds_received, &max_slots, &first_shreds_received,