diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 80ef798c35..8c97735448 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -1,14 +1,26 @@ use { crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, itertools::Itertools, + lru::LruCache, solana_gossip::{ cluster_info::{compute_retransmit_peers, ClusterInfo}, contact_info::ContactInfo, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, weighted_shuffle::{weighted_best, weighted_shuffle}, }, - solana_sdk::pubkey::Pubkey, - std::{any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData}, + solana_runtime::bank::Bank, + solana_sdk::{ + clock::{Epoch, Slot}, + pubkey::Pubkey, + }, + std::{ + any::TypeId, + cmp::Reverse, + collections::HashMap, + marker::PhantomData, + sync::{Arc, Mutex}, + time::{Duration, Instant}, + }, }; enum NodeId { @@ -35,6 +47,12 @@ pub struct ClusterNodes { _phantom: PhantomData, } +pub struct ClusterNodesCache { + #[allow(clippy::type_complexity)] + cache: Mutex>)>>, + ttl: Duration, // Time to live. +} + impl Node { #[inline] fn pubkey(&self) -> Pubkey { @@ -211,6 +229,77 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec ClusterNodesCache { + pub fn new( + // Capacity of underlying LRU-cache in terms of number of epochs. + cap: usize, + // A time-to-live eviction policy is enforced to refresh entries in + // case gossip contact-infos are updated. + ttl: Duration, + ) -> Self { + Self { + cache: Mutex::new(LruCache::new(cap)), + ttl, + } + } +} + +impl ClusterNodesCache { + pub fn get( + &self, + cluster_info: &ClusterInfo, + working_bank: &Bank, + ) -> Arc> { + let slot = working_bank.slot(); + let epoch = working_bank.get_leader_schedule_epoch(slot); + { + let mut cache = self.cache.lock().unwrap(); + if let Some((asof, nodes)) = cache.get(&epoch) { + if asof.elapsed() < self.ttl { + return Arc::clone(nodes); + } + cache.pop(&epoch); + } + } + let epoch_staked_nodes = working_bank.epoch_staked_nodes(epoch).unwrap_or_default(); + let nodes = ClusterNodes::::new(cluster_info, &epoch_staked_nodes); + let nodes = Arc::new(nodes); + { + let mut cache = self.cache.lock().unwrap(); + cache.put(epoch, (Instant::now(), Arc::clone(&nodes))); + } + nodes + } +} + +impl ClusterNodesCache { + pub fn get( + &self, + shred_slot: Slot, + root_bank: &Bank, + cluster_info: &ClusterInfo, + ) -> Arc> { + let epoch = root_bank.get_leader_schedule_epoch(shred_slot); + { + let mut cache = self.cache.lock().unwrap(); + if let Some((asof, nodes)) = cache.get(&epoch) { + if asof.elapsed() < self.ttl { + return Arc::clone(nodes); + } + cache.pop(&epoch); + } + } + let epoch_staked_nodes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default(); + let nodes = ClusterNodes::::new(cluster_info, &epoch_staked_nodes); + let nodes = Arc::new(nodes); + { + let mut cache = self.cache.lock().unwrap(); + cache.put(epoch, (Instant::now(), Arc::clone(&nodes))); + } + nodes + } +} + impl From for NodeId { fn from(node: ContactInfo) -> Self { NodeId::ContactInfo(node)