diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 1e57fdda42..8e45ebb6a1 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -30,8 +30,7 @@ use { iter::repeat_with, marker::PhantomData, net::SocketAddr, - ops::Deref, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }, thiserror::Error, @@ -72,9 +71,9 @@ pub struct ClusterNodes { type CacheEntry = Option<(/*as of:*/ Instant, Arc>)>; pub struct ClusterNodesCache { - // Cache entries are wrapped in Arc>, so that, when needed, only + // Cache entries are wrapped in Arc>, so that, when needed, only // one thread does the computations to update the entry for the epoch. - cache: Mutex>>>>, + cache: Mutex>>>>, ttl: Duration, // Time to live. } @@ -343,7 +342,7 @@ impl ClusterNodesCache { } impl ClusterNodesCache { - fn get_cache_entry(&self, epoch: Epoch) -> Arc>> { + fn get_cache_entry(&self, epoch: Epoch) -> Arc>> { let mut cache = self.cache.lock().unwrap(); match cache.get(&epoch) { Some(entry) => Arc::clone(entry), @@ -364,13 +363,19 @@ impl ClusterNodesCache { ) -> Arc> { let epoch = root_bank.get_leader_schedule_epoch(shred_slot); let entry = self.get_cache_entry(epoch); + if let Some((_, nodes)) = entry + .read() + .unwrap() + .as_ref() + .filter(|(asof, _)| asof.elapsed() < self.ttl) + { + return nodes.clone(); + } // Hold the lock on the entry here so that, if needed, only // one thread recomputes cluster-nodes for this epoch. - let mut entry = entry.lock().unwrap(); - if let Some((asof, nodes)) = entry.deref() { - if asof.elapsed() < self.ttl { - return Arc::clone(nodes); - } + let mut entry = entry.write().unwrap(); + if let Some((_, nodes)) = entry.as_ref().filter(|(asof, _)| asof.elapsed() < self.ttl) { + return nodes.clone(); } let epoch_staked_nodes = [root_bank, working_bank] .iter()