Fix bank executor stats and remove copy-on-write semantics (#25621)

* Fix bank executor stats and remove copy-on-write semantics

* Remove clone implementation for CachedExecutors

* feedback
This commit is contained in:
Justin Starry 2022-06-04 08:54:26 -05:00 committed by GitHub
parent 9851774133
commit 61ad8fcc55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 164 additions and 80 deletions

View File

@ -346,18 +346,31 @@ struct CachedExecutorsEntry {
executor: Arc<dyn Executor>,
hit_count: AtomicU64,
}
impl Clone for CachedExecutorsEntry {
fn clone(&self) -> Self {
Self {
prev_epoch_count: self.prev_epoch_count,
epoch_count: AtomicU64::new(self.epoch_count.load(Relaxed)),
executor: self.executor.clone(),
hit_count: AtomicU64::new(self.hit_count.load(Relaxed)),
}
}
}
/// LFU Cache of executors with single-epoch memory of usage counts
#[derive(Debug)]
struct CachedExecutors {
max: usize,
capacity: usize,
current_epoch: Epoch,
pub(self) executors: HashMap<Pubkey, CachedExecutorsEntry>,
stats: executor_cache::Stats,
}
impl Default for CachedExecutors {
fn default() -> Self {
Self {
max: MAX_CACHED_EXECUTORS,
capacity: MAX_CACHED_EXECUTORS,
current_epoch: Epoch::default(),
executors: HashMap::default(),
stats: executor_cache::Stats::default(),
@ -375,59 +388,46 @@ impl AbiExample for CachedExecutors {
}
}
impl Clone for CachedExecutors {
fn clone(&self) -> Self {
let executors = self.executors.iter().map(|(&key, entry)| {
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.prev_epoch_count,
epoch_count: AtomicU64::new(entry.epoch_count.load(Relaxed)),
executor: entry.executor.clone(),
hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)),
};
(key, entry)
});
Self {
max: self.max,
current_epoch: self.current_epoch,
executors: executors.collect(),
stats: executor_cache::Stats::default(),
}
}
}
impl CachedExecutors {
fn clone_with_epoch(self: &Arc<Self>, epoch: Epoch) -> Arc<Self> {
if self.current_epoch == epoch {
return self.clone();
}
let executors = self.executors.iter().map(|(&key, entry)| {
// The total_count = prev_epoch_count + epoch_count will be used for LFU eviction.
// If the epoch has changed, we store the prev_epoch_count and reset the epoch_count to 0.
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.epoch_count.load(Relaxed),
epoch_count: AtomicU64::default(),
executor: entry.executor.clone(),
hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)),
};
(key, entry)
});
Arc::new(Self {
max: self.max,
current_epoch: epoch,
executors: executors.collect(),
stats: executor_cache::Stats::default(),
})
}
fn new(max: usize, current_epoch: Epoch) -> Self {
fn new(max_capacity: usize, current_epoch: Epoch) -> Self {
Self {
max,
capacity: max_capacity,
current_epoch,
executors: HashMap::new(),
stats: executor_cache::Stats::default(),
}
}
fn new_from_parent_bank_executors(
parent_bank_executors: &CachedExecutors,
current_epoch: Epoch,
) -> Self {
let executors = if parent_bank_executors.current_epoch == current_epoch {
parent_bank_executors.executors.clone()
} else {
parent_bank_executors
.executors
.iter()
.map(|(&key, entry)| {
let entry = CachedExecutorsEntry {
prev_epoch_count: entry.epoch_count.load(Relaxed),
epoch_count: AtomicU64::default(),
executor: entry.executor.clone(),
hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)),
};
(key, entry)
})
.collect()
};
Self {
capacity: parent_bank_executors.capacity,
current_epoch,
executors,
stats: executor_cache::Stats::default(),
}
}
fn get(&self, pubkey: &Pubkey) -> Option<Arc<dyn Executor>> {
if let Some(entry) = self.executors.get(pubkey) {
self.stats.hits.fetch_add(1, Relaxed);
@ -469,7 +469,7 @@ impl CachedExecutors {
let primer_counts = Self::get_primer_counts(counts.as_slice(), new_executors.len());
if self.executors.len() >= self.max {
if self.executors.len() >= self.capacity {
let mut least_keys = counts
.iter()
.take(new_executors.len())
@ -1330,9 +1330,7 @@ pub struct Bank {
pub rewards_pool_pubkeys: Arc<HashSet<Pubkey>>,
/// Cached executors
// Inner Arc is meant to implement copy-on-write semantics as opposed to
// sharing mutations (hence RwLock<Arc<...>> instead of Arc<RwLock<...>>).
cached_executors: RwLock<Arc<CachedExecutors>>,
cached_executors: RwLock<CachedExecutors>,
transaction_debug_keys: Option<Arc<HashSet<Pubkey>>>,
@ -1517,7 +1515,7 @@ impl Bank {
cluster_type: Option::<ClusterType>::default(),
lazy_rent_collection: AtomicBool::default(),
rewards_pool_pubkeys: Arc::<HashSet<Pubkey>>::default(),
cached_executors: RwLock::<Arc<CachedExecutors>>::default(),
cached_executors: RwLock::<CachedExecutors>::default(),
transaction_debug_keys: Option::<Arc<HashSet<Pubkey>>>::default(),
transaction_log_collector_config: Arc::<RwLock<TransactionLogCollectorConfig>>::default(
),
@ -1777,8 +1775,11 @@ impl Bank {
let (cached_executors, cached_executors_time) = Measure::this(
|_| {
let cached_executors = parent.cached_executors.read().unwrap();
RwLock::new(cached_executors.clone_with_epoch(epoch))
let parent_bank_executors = parent.cached_executors.read().unwrap();
RwLock::new(CachedExecutors::new_from_parent_bank_executors(
&parent_bank_executors,
epoch,
))
},
(),
"cached_executors_creation",
@ -2205,10 +2206,7 @@ impl Bank {
cluster_type: Some(genesis_config.cluster_type),
lazy_rent_collection: new(),
rewards_pool_pubkeys: new(),
cached_executors: RwLock::new(Arc::new(CachedExecutors::new(
MAX_CACHED_EXECUTORS,
fields.epoch,
))),
cached_executors: RwLock::new(CachedExecutors::new(MAX_CACHED_EXECUTORS, fields.epoch)),
transaction_debug_keys: debug_keys,
transaction_log_collector_config: new(),
transaction_log_collector: new(),
@ -4193,15 +4191,17 @@ impl Bank {
return Rc::new(RefCell::new(Executors::default()));
}
let cache = self.cached_executors.read().unwrap();
let executors = executable_keys
.into_iter()
.filter_map(|key| {
cache
.get(key)
.map(|executor| (*key, TransactionExecutor::new_cached(executor)))
})
.collect();
let executors = {
let cache = self.cached_executors.read().unwrap();
executable_keys
.into_iter()
.filter_map(|key| {
cache
.get(key)
.map(|executor| (*key, TransactionExecutor::new_cached(executor)))
})
.collect()
};
Rc::new(RefCell::new(executors))
}
@ -4229,21 +4229,17 @@ impl Bank {
.collect();
if !dirty_executors.is_empty() {
let mut cache = self.cached_executors.write().unwrap();
let cache = Arc::make_mut(&mut cache);
cache.put(&dirty_executors);
self.cached_executors.write().unwrap().put(&dirty_executors);
}
}
/// Remove an executor from the bank's cache
fn remove_executor(&self, pubkey: &Pubkey) {
let mut cache = self.cached_executors.write().unwrap();
let _ = Arc::make_mut(&mut cache).remove(pubkey);
let _ = self.cached_executors.write().unwrap().remove(pubkey);
}
pub fn clear_executors(&self) {
let mut cache = self.cached_executors.write().unwrap();
Arc::make_mut(&mut cache).clear();
self.cached_executors.write().unwrap().clear();
}
/// Execute a transaction using the provided loaded accounts and update
@ -14664,13 +14660,13 @@ pub(crate) mod tests {
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
let mut cache = Arc::new(cache).clone_with_epoch(1);
let mut cache = CachedExecutors::new_from_parent_bank_executors(&cache, 1);
assert!(cache.current_epoch == 1);
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());
Arc::make_mut(&mut cache).put(&[(&key4, executor.clone())]);
cache.put(&[(&key4, executor.clone())]);
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
@ -14679,8 +14675,8 @@ pub(crate) mod tests {
.count();
assert_eq!(num_retained, 2);
Arc::make_mut(&mut cache).put(&[(&key1, executor.clone())]);
Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key3).is_some());
let num_retained = [&key2, &key4]
@ -14689,10 +14685,10 @@ pub(crate) mod tests {
.count();
assert_eq!(num_retained, 1);
cache = cache.clone_with_epoch(2);
cache = CachedExecutors::new_from_parent_bank_executors(&cache, 2);
assert!(cache.current_epoch == 2);
Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key3).is_some());
}
@ -14758,6 +14754,94 @@ pub(crate) mod tests {
assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 1);
}
#[test]
fn test_cached_executors_stats() {
#[derive(Debug, Default, PartialEq)]
struct ComparableStats {
hits: u64,
misses: u64,
evictions: HashMap<Pubkey, u64>,
insertions: u64,
replacements: u64,
one_hit_wonders: u64,
}
impl From<&executor_cache::Stats> for ComparableStats {
fn from(stats: &executor_cache::Stats) -> Self {
let executor_cache::Stats {
hits,
misses,
evictions,
insertions,
replacements,
one_hit_wonders,
} = stats;
ComparableStats {
hits: hits.load(Relaxed),
misses: misses.load(Relaxed),
evictions: evictions.clone(),
insertions: insertions.load(Relaxed),
replacements: replacements.load(Relaxed),
one_hit_wonders: one_hit_wonders.load(Relaxed),
}
}
}
const CURRENT_EPOCH: Epoch = 0;
let mut cache = CachedExecutors::new(2, CURRENT_EPOCH);
let mut expected_stats = ComparableStats::default();
let program_id1 = Pubkey::new_unique();
let program_id2 = Pubkey::new_unique();
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
// make sure we're starting from where we think we are
assert_eq!(ComparableStats::from(&cache.stats), expected_stats,);
// insert some executors
cache.put(&[(&program_id1, executor.clone())]);
cache.put(&[(&program_id2, executor.clone())]);
expected_stats.insertions += 2;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);
// replace a one-hit-wonder executor
cache.put(&[(&program_id1, executor.clone())]);
expected_stats.replacements += 1;
expected_stats.one_hit_wonders += 1;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);
// hit some executors
cache.get(&program_id1);
cache.get(&program_id1);
cache.get(&program_id2);
expected_stats.hits += 3;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);
// miss an executor
cache.get(&Pubkey::new_unique());
expected_stats.misses += 1;
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);
// evict an executor
cache.put(&[(&Pubkey::new_unique(), executor.clone())]);
expected_stats.insertions += 1;
expected_stats.evictions.insert(program_id2, 1);
assert_eq!(ComparableStats::from(&cache.stats), expected_stats);
// make sure stats are cleared in new_from_parent
assert_eq!(
ComparableStats::from(
&CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH).stats
),
ComparableStats::default()
);
assert_eq!(
ComparableStats::from(
&CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH + 1).stats
),
ComparableStats::default()
);
}
#[test]
fn test_bank_executor_cache() {
solana_logger::setup();