implements cluster-nodes cache

Cluster nodes are cached keyed by the respective epoch from which stakes
are obtained, and so if epoch changes cluster-nodes will be recomputed.

A time-to-live eviction policy is enforced to refresh entries in case
gossip contact-infos are updated.
This commit is contained in:
behzad nouri 2021-07-28 10:55:31 -04:00
parent 44b11154ca
commit ecc1c7957f
1 changed files with 91 additions and 2 deletions

View File

@ -1,14 +1,26 @@
use { use {
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage}, crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
itertools::Itertools, itertools::Itertools,
lru::LruCache,
solana_gossip::{ solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo}, cluster_info::{compute_retransmit_peers, ClusterInfo},
contact_info::ContactInfo, contact_info::ContactInfo,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
weighted_shuffle::{weighted_best, weighted_shuffle}, weighted_shuffle::{weighted_best, weighted_shuffle},
}, },
solana_sdk::pubkey::Pubkey, solana_runtime::bank::Bank,
std::{any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData}, solana_sdk::{
clock::{Epoch, Slot},
pubkey::Pubkey,
},
std::{
any::TypeId,
cmp::Reverse,
collections::HashMap,
marker::PhantomData,
sync::{Arc, Mutex},
time::{Duration, Instant},
},
}; };
enum NodeId { enum NodeId {
@ -35,6 +47,12 @@ pub struct ClusterNodes<T> {
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
} }
pub struct ClusterNodesCache<T> {
#[allow(clippy::type_complexity)]
cache: Mutex<LruCache<Epoch, (Instant, Arc<ClusterNodes<T>>)>>,
ttl: Duration, // Time to live.
}
impl Node { impl Node {
#[inline] #[inline]
fn pubkey(&self) -> Pubkey { fn pubkey(&self) -> Pubkey {
@ -211,6 +229,77 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect() .collect()
} }
impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
cap: usize,
// A time-to-live eviction policy is enforced to refresh entries in
// case gossip contact-infos are updated.
ttl: Duration,
) -> Self {
Self {
cache: Mutex::new(LruCache::new(cap)),
ttl,
}
}
}
impl ClusterNodesCache<RetransmitStage> {
pub fn get(
&self,
cluster_info: &ClusterInfo,
working_bank: &Bank,
) -> Arc<ClusterNodes<RetransmitStage>> {
let slot = working_bank.slot();
let epoch = working_bank.get_leader_schedule_epoch(slot);
{
let mut cache = self.cache.lock().unwrap();
if let Some((asof, nodes)) = cache.get(&epoch) {
if asof.elapsed() < self.ttl {
return Arc::clone(nodes);
}
cache.pop(&epoch);
}
}
let epoch_staked_nodes = working_bank.epoch_staked_nodes(epoch).unwrap_or_default();
let nodes = ClusterNodes::<RetransmitStage>::new(cluster_info, &epoch_staked_nodes);
let nodes = Arc::new(nodes);
{
let mut cache = self.cache.lock().unwrap();
cache.put(epoch, (Instant::now(), Arc::clone(&nodes)));
}
nodes
}
}
impl ClusterNodesCache<BroadcastStage> {
pub fn get(
&self,
shred_slot: Slot,
root_bank: &Bank,
cluster_info: &ClusterInfo,
) -> Arc<ClusterNodes<BroadcastStage>> {
let epoch = root_bank.get_leader_schedule_epoch(shred_slot);
{
let mut cache = self.cache.lock().unwrap();
if let Some((asof, nodes)) = cache.get(&epoch) {
if asof.elapsed() < self.ttl {
return Arc::clone(nodes);
}
cache.pop(&epoch);
}
}
let epoch_staked_nodes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default();
let nodes = ClusterNodes::<BroadcastStage>::new(cluster_info, &epoch_staked_nodes);
let nodes = Arc::new(nodes);
{
let mut cache = self.cache.lock().unwrap();
cache.put(epoch, (Instant::now(), Arc::clone(&nodes)));
}
nodes
}
}
impl From<ContactInfo> for NodeId { impl From<ContactInfo> for NodeId {
fn from(node: ContactInfo) -> Self { fn from(node: ContactInfo) -> Self {
NodeId::ContactInfo(node) NodeId::ContactInfo(node)