uses RwLock instead of Mutex for cache entries in ClusterNodesCache (#32686)
For as long as cache entries have not expired yet, a shared lock is enough and would avoid lock contention.
This commit is contained in:
parent
4dc6eb14e0
commit
4e4b0a361e
|
@ -30,8 +30,7 @@ use {
|
||||||
iter::repeat_with,
|
iter::repeat_with,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
ops::Deref,
|
sync::{Arc, Mutex, RwLock},
|
||||||
sync::{Arc, Mutex},
|
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
},
|
},
|
||||||
thiserror::Error,
|
thiserror::Error,
|
||||||
|
@ -72,9 +71,9 @@ pub struct ClusterNodes<T> {
|
||||||
type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>;
|
type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>;
|
||||||
|
|
||||||
pub struct ClusterNodesCache<T> {
|
pub struct ClusterNodesCache<T> {
|
||||||
// Cache entries are wrapped in Arc<Mutex<...>>, so that, when needed, only
|
// Cache entries are wrapped in Arc<RwLock<...>>, so that, when needed, only
|
||||||
// one thread does the computations to update the entry for the epoch.
|
// one thread does the computations to update the entry for the epoch.
|
||||||
cache: Mutex<LruCache<Epoch, Arc<Mutex<CacheEntry<T>>>>>,
|
cache: Mutex<LruCache<Epoch, Arc<RwLock<CacheEntry<T>>>>>,
|
||||||
ttl: Duration, // Time to live.
|
ttl: Duration, // Time to live.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,7 +342,7 @@ impl<T> ClusterNodesCache<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: 'static> ClusterNodesCache<T> {
|
impl<T: 'static> ClusterNodesCache<T> {
|
||||||
fn get_cache_entry(&self, epoch: Epoch) -> Arc<Mutex<CacheEntry<T>>> {
|
fn get_cache_entry(&self, epoch: Epoch) -> Arc<RwLock<CacheEntry<T>>> {
|
||||||
let mut cache = self.cache.lock().unwrap();
|
let mut cache = self.cache.lock().unwrap();
|
||||||
match cache.get(&epoch) {
|
match cache.get(&epoch) {
|
||||||
Some(entry) => Arc::clone(entry),
|
Some(entry) => Arc::clone(entry),
|
||||||
|
@ -364,13 +363,19 @@ impl<T: 'static> ClusterNodesCache<T> {
|
||||||
) -> Arc<ClusterNodes<T>> {
|
) -> Arc<ClusterNodes<T>> {
|
||||||
let epoch = root_bank.get_leader_schedule_epoch(shred_slot);
|
let epoch = root_bank.get_leader_schedule_epoch(shred_slot);
|
||||||
let entry = self.get_cache_entry(epoch);
|
let entry = self.get_cache_entry(epoch);
|
||||||
|
if let Some((_, nodes)) = entry
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.as_ref()
|
||||||
|
.filter(|(asof, _)| asof.elapsed() < self.ttl)
|
||||||
|
{
|
||||||
|
return nodes.clone();
|
||||||
|
}
|
||||||
// Hold the lock on the entry here so that, if needed, only
|
// Hold the lock on the entry here so that, if needed, only
|
||||||
// one thread recomputes cluster-nodes for this epoch.
|
// one thread recomputes cluster-nodes for this epoch.
|
||||||
let mut entry = entry.lock().unwrap();
|
let mut entry = entry.write().unwrap();
|
||||||
if let Some((asof, nodes)) = entry.deref() {
|
if let Some((_, nodes)) = entry.as_ref().filter(|(asof, _)| asof.elapsed() < self.ttl) {
|
||||||
if asof.elapsed() < self.ttl {
|
return nodes.clone();
|
||||||
return Arc::clone(nodes);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let epoch_staked_nodes = [root_bank, working_bank]
|
let epoch_staked_nodes = [root_bank, working_bank]
|
||||||
.iter()
|
.iter()
|
||||||
|
|
Loading…
Reference in New Issue