From 2fc8e533a2585ce96cb6b3a1a85917ac09a81865 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Mei=C3=9Fner?= Date: Mon, 10 Oct 2022 15:01:41 +0200 Subject: [PATCH] Refactor - Move `executor_cache` to program-runtime crate (#28322) * Moves CachedExecutors, related structs, consts and tests into the program-runtime crate. * Moves TransactionExecutor, related enum and type defs into executor_cache mod. --- Cargo.lock | 1 + program-runtime/Cargo.toml | 1 + program-runtime/src/executor_cache.rs | 637 ++++++++++++++++++++++++++ program-runtime/src/invoke_context.rs | 74 +-- program-runtime/src/lib.rs | 1 + programs/bpf/Cargo.lock | 1 + programs/bpf_loader/src/lib.rs | 3 +- runtime/src/accounts.rs | 2 +- runtime/src/bank.rs | 534 +-------------------- runtime/src/message_processor.rs | 3 +- 10 files changed, 652 insertions(+), 605 deletions(-) create mode 100644 program-runtime/src/executor_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 5b17a1cab..43d921659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5852,6 +5852,7 @@ dependencies = [ "log", "num-derive", "num-traits", + "rand 0.7.3", "rustc_version 0.4.0", "serde", "solana-frozen-abi 1.15.0", diff --git a/program-runtime/Cargo.toml b/program-runtime/Cargo.toml index ae060e5fa..aae3ff0cd 100644 --- a/program-runtime/Cargo.toml +++ b/program-runtime/Cargo.toml @@ -20,6 +20,7 @@ libloading = "0.7.0" log = "0.4.14" num-derive = { version = "0.3" } num-traits = { version = "0.2" } +rand = "0.7.0" serde = { version = "1.0.129", features = ["derive", "rc"] } solana-frozen-abi = { path = "../frozen-abi", version = "=1.15.0" } solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.15.0" } diff --git a/program-runtime/src/executor_cache.rs b/program-runtime/src/executor_cache.rs new file mode 100644 index 000000000..ef3f23a8f --- /dev/null +++ b/program-runtime/src/executor_cache.rs @@ -0,0 +1,637 @@ +use { + crate::invoke_context::InvokeContext, + log::*, + rand::Rng, + solana_sdk::{ + instruction::InstructionError, pubkey::Pubkey, saturating_add_assign, slot_history::Slot, + stake_history::Epoch, transaction_context::IndexOfAccount, + }, + std::{ + collections::HashMap, + fmt::Debug, + ops::Div, + sync::{ + atomic::{AtomicU64, Ordering::Relaxed}, + Arc, + }, + }, +}; + +/// Program executor +pub trait Executor: Debug + Send + Sync { + /// Execute the program + fn execute( + &self, + first_instruction_account: IndexOfAccount, + invoke_context: &mut InvokeContext, + ) -> Result<(), InstructionError>; +} + +pub type Executors = HashMap; + +#[repr(u8)] +#[derive(PartialEq, Debug)] +enum TransactionExecutorStatus { + /// Executor was already in the cache, no update needed + Cached, + /// Executor was missing from the cache, but not updated + Missing, + /// Executor is for an updated program + Updated, +} + +/// Tracks whether a given executor is "dirty" and needs to updated in the +/// executors cache +#[derive(Debug)] +pub struct TransactionExecutor { + pub(crate) executor: Arc, + status: TransactionExecutorStatus, +} + +impl TransactionExecutor { + /// Wraps an executor and tracks that it doesn't need to be updated in the + /// executors cache. + pub fn new_cached(executor: Arc) -> Self { + Self { + executor, + status: TransactionExecutorStatus::Cached, + } + } + + /// Wraps an executor and tracks that it needs to be updated in the + /// executors cache. + pub fn new_miss(executor: Arc) -> Self { + Self { + executor, + status: TransactionExecutorStatus::Missing, + } + } + + /// Wraps an executor and tracks that it needs to be updated in the + /// executors cache only if the transaction succeeded. + pub fn new_updated(executor: Arc) -> Self { + Self { + executor, + status: TransactionExecutorStatus::Updated, + } + } + + pub fn is_missing(&self) -> bool { + self.status == TransactionExecutorStatus::Missing + } + + pub fn is_updated(&self) -> bool { + self.status == TransactionExecutorStatus::Updated + } + + pub fn get(&self) -> Arc { + self.executor.clone() + } +} + +/// Capacity of `CachedExecutors` +pub const MAX_CACHED_EXECUTORS: usize = 256; + +/// An `Executor` and its statistics tracked in `CachedExecutors` +#[derive(Debug)] +pub struct CachedExecutorsEntry { + prev_epoch_count: u64, + epoch_count: AtomicU64, + executor: Arc, + pub hit_count: AtomicU64, +} + +impl Clone for CachedExecutorsEntry { + fn clone(&self) -> Self { + Self { + prev_epoch_count: self.prev_epoch_count, + epoch_count: AtomicU64::new(self.epoch_count.load(Relaxed)), + executor: self.executor.clone(), + hit_count: AtomicU64::new(self.hit_count.load(Relaxed)), + } + } +} + +/// LFU Cache of executors with single-epoch memory of usage counts +#[derive(Debug)] +pub struct CachedExecutors { + capacity: usize, + current_epoch: Epoch, + pub executors: HashMap, + pub stats: Stats, +} + +impl Default for CachedExecutors { + fn default() -> Self { + Self { + capacity: MAX_CACHED_EXECUTORS, + current_epoch: Epoch::default(), + executors: HashMap::default(), + stats: Stats::default(), + } + } +} + +#[cfg(RUSTC_WITH_SPECIALIZATION)] +impl solana_frozen_abi::abi_example::AbiExample for CachedExecutors { + fn example() -> Self { + // Delegate AbiExample impl to Default before going deep and stuck with + // not easily impl-able Arc due to rust's coherence issue + // This is safe because CachedExecutors isn't serializable by definition. + Self::default() + } +} + +impl CachedExecutors { + pub fn new(max_capacity: usize, current_epoch: Epoch) -> Self { + Self { + capacity: max_capacity, + current_epoch, + executors: HashMap::new(), + stats: Stats::default(), + } + } + + pub fn new_from_parent_bank_executors( + parent_bank_executors: &CachedExecutors, + current_epoch: Epoch, + ) -> Self { + let executors = if parent_bank_executors.current_epoch == current_epoch { + parent_bank_executors.executors.clone() + } else { + parent_bank_executors + .executors + .iter() + .map(|(&key, entry)| { + let entry = CachedExecutorsEntry { + prev_epoch_count: entry.epoch_count.load(Relaxed), + epoch_count: AtomicU64::default(), + executor: entry.executor.clone(), + hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)), + }; + (key, entry) + }) + .collect() + }; + + Self { + capacity: parent_bank_executors.capacity, + current_epoch, + executors, + stats: Stats::default(), + } + } + + pub fn get(&self, pubkey: &Pubkey) -> Option> { + if let Some(entry) = self.executors.get(pubkey) { + self.stats.hits.fetch_add(1, Relaxed); + entry.epoch_count.fetch_add(1, Relaxed); + entry.hit_count.fetch_add(1, Relaxed); + Some(entry.executor.clone()) + } else { + self.stats.misses.fetch_add(1, Relaxed); + None + } + } + + pub fn put(&mut self, executors: &[(&Pubkey, Arc)]) { + let mut new_executors: Vec<_> = executors + .iter() + .filter_map(|(key, executor)| { + if let Some(mut entry) = self.remove(key) { + self.stats.replacements.fetch_add(1, Relaxed); + entry.executor = executor.clone(); + let _ = self.executors.insert(**key, entry); + None + } else { + self.stats.insertions.fetch_add(1, Relaxed); + Some((*key, executor)) + } + }) + .collect(); + + if !new_executors.is_empty() { + let mut counts = self + .executors + .iter() + .map(|(key, entry)| { + let count = entry + .prev_epoch_count + .saturating_add(entry.epoch_count.load(Relaxed)); + (key, count) + }) + .collect::>(); + counts.sort_unstable_by_key(|(_, count)| *count); + + let primer_counts = Self::get_primer_counts(counts.as_slice(), new_executors.len()); + + if self.executors.len() >= self.capacity { + let mut least_keys = counts + .iter() + .take(new_executors.len()) + .map(|least| *least.0) + .collect::>(); + for least_key in least_keys.drain(..) { + let _ = self.remove(&least_key); + self.stats + .evictions + .entry(least_key) + .and_modify(|c| saturating_add_assign!(*c, 1)) + .or_insert(1); + } + } + + for ((key, executor), primer_count) in new_executors.drain(..).zip(primer_counts) { + let entry = CachedExecutorsEntry { + prev_epoch_count: 0, + epoch_count: AtomicU64::new(primer_count), + executor: executor.clone(), + hit_count: AtomicU64::new(1), + }; + let _ = self.executors.insert(*key, entry); + } + } + } + + pub fn remove(&mut self, pubkey: &Pubkey) -> Option { + let maybe_entry = self.executors.remove(pubkey); + if let Some(entry) = maybe_entry.as_ref() { + if entry.hit_count.load(Relaxed) == 1 { + self.stats.one_hit_wonders.fetch_add(1, Relaxed); + } + } + maybe_entry + } + + pub fn clear(&mut self) { + *self = CachedExecutors::default(); + } + + pub fn get_primer_count_upper_bound_inclusive(counts: &[(&Pubkey, u64)]) -> u64 { + const PRIMER_COUNT_TARGET_PERCENTILE: u64 = 85; + #[allow(clippy::assertions_on_constants)] + { + assert!(PRIMER_COUNT_TARGET_PERCENTILE <= 100); + } + // Executor use-frequencies are assumed to fit a Pareto distribution. Choose an + // upper-bound for our primer count as the actual count at the target rank to avoid + // an upward bias + + let target_index = u64::try_from(counts.len().saturating_sub(1)) + .ok() + .and_then(|counts| { + let index = counts + .saturating_mul(PRIMER_COUNT_TARGET_PERCENTILE) + .div(100); // switch to u64::saturating_div once stable + usize::try_from(index).ok() + }) + .unwrap_or(0); + + counts + .get(target_index) + .map(|(_, count)| *count) + .unwrap_or(0) + } + + pub fn get_primer_counts(counts: &[(&Pubkey, u64)], num_counts: usize) -> Vec { + let max_primer_count = Self::get_primer_count_upper_bound_inclusive(counts); + let mut rng = rand::thread_rng(); + + (0..num_counts) + .map(|_| rng.gen_range(0, max_primer_count.saturating_add(1))) + .collect::>() + } +} + +/// Statistics of the entrie `CachedExecutors` +#[derive(Debug, Default)] +pub struct Stats { + pub hits: AtomicU64, + pub misses: AtomicU64, + pub evictions: HashMap, + pub insertions: AtomicU64, + pub replacements: AtomicU64, + pub one_hit_wonders: AtomicU64, +} + +impl Stats { + /// Logs the measurement values + pub fn submit(&self, slot: Slot) { + let hits = self.hits.load(Relaxed); + let misses = self.misses.load(Relaxed); + let insertions = self.insertions.load(Relaxed); + let replacements = self.replacements.load(Relaxed); + let one_hit_wonders = self.one_hit_wonders.load(Relaxed); + let evictions: u64 = self.evictions.values().sum(); + datapoint_info!( + "bank-executor-cache-stats", + ("slot", slot, i64), + ("hits", hits, i64), + ("misses", misses, i64), + ("evictions", evictions, i64), + ("insertions", insertions, i64), + ("replacements", replacements, i64), + ("one_hit_wonders", one_hit_wonders, i64), + ); + debug!( + "Executor Cache Stats -- Hits: {}, Misses: {}, Evictions: {}, Insertions: {}, Replacements: {}, One-Hit-Wonders: {}", + hits, misses, evictions, insertions, replacements, one_hit_wonders, + ); + if log_enabled!(log::Level::Trace) && !self.evictions.is_empty() { + let mut evictions = self.evictions.iter().collect::>(); + evictions.sort_by_key(|e| e.1); + let evictions = evictions + .into_iter() + .rev() + .map(|(program_id, evictions)| { + format!(" {:<44} {}", program_id.to_string(), evictions) + }) + .collect::>(); + let evictions = evictions.join("\n"); + trace!( + "Eviction Details:\n {:<44} {}\n{}", + "Program", + "Count", + evictions + ); + } + } +} + +#[allow(clippy::indexing_slicing)] +#[cfg(test)] +mod tests { + use { + super::*, crate::invoke_context::InvokeContext, solana_sdk::instruction::InstructionError, + }; + + #[derive(Debug)] + struct TestExecutor {} + impl Executor for TestExecutor { + fn execute( + &self, + _first_instruction_account: IndexOfAccount, + _invoke_context: &mut InvokeContext, + ) -> std::result::Result<(), InstructionError> { + Ok(()) + } + } + + #[test] + fn test_cached_executors() { + let key1 = solana_sdk::pubkey::new_rand(); + let key2 = solana_sdk::pubkey::new_rand(); + let key3 = solana_sdk::pubkey::new_rand(); + let key4 = solana_sdk::pubkey::new_rand(); + let executor: Arc = Arc::new(TestExecutor {}); + let mut cache = CachedExecutors::new(3, 0); + + cache.put(&[(&key1, executor.clone())]); + cache.put(&[(&key2, executor.clone())]); + cache.put(&[(&key3, executor.clone())]); + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key2).is_some()); + assert!(cache.get(&key3).is_some()); + + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key2).is_some()); + cache.put(&[(&key4, executor.clone())]); + assert!(cache.get(&key4).is_some()); + let num_retained = [&key1, &key2, &key3] + .iter() + .filter_map(|key| cache.get(key)) + .count(); + assert_eq!(num_retained, 2); + + assert!(cache.get(&key4).is_some()); + assert!(cache.get(&key4).is_some()); + assert!(cache.get(&key4).is_some()); + cache.put(&[(&key3, executor.clone())]); + assert!(cache.get(&key3).is_some()); + let num_retained = [&key1, &key2, &key4] + .iter() + .filter_map(|key| cache.get(key)) + .count(); + assert_eq!(num_retained, 2); + } + + #[test] + fn test_cached_executor_eviction() { + let key1 = solana_sdk::pubkey::new_rand(); + let key2 = solana_sdk::pubkey::new_rand(); + let key3 = solana_sdk::pubkey::new_rand(); + let key4 = solana_sdk::pubkey::new_rand(); + let executor: Arc = Arc::new(TestExecutor {}); + let mut cache = CachedExecutors::new(3, 0); + assert!(cache.current_epoch == 0); + + cache.put(&[(&key1, executor.clone())]); + cache.put(&[(&key2, executor.clone())]); + cache.put(&[(&key3, executor.clone())]); + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key1).is_some()); + + let mut cache = CachedExecutors::new_from_parent_bank_executors(&cache, 1); + assert!(cache.current_epoch == 1); + + assert!(cache.get(&key2).is_some()); + assert!(cache.get(&key2).is_some()); + assert!(cache.get(&key3).is_some()); + cache.put(&[(&key4, executor.clone())]); + + assert!(cache.get(&key4).is_some()); + let num_retained = [&key1, &key2, &key3] + .iter() + .filter_map(|key| cache.get(key)) + .count(); + assert_eq!(num_retained, 2); + + cache.put(&[(&key1, executor.clone())]); + cache.put(&[(&key3, executor.clone())]); + assert!(cache.get(&key1).is_some()); + assert!(cache.get(&key3).is_some()); + let num_retained = [&key2, &key4] + .iter() + .filter_map(|key| cache.get(key)) + .count(); + assert_eq!(num_retained, 1); + + cache = CachedExecutors::new_from_parent_bank_executors(&cache, 2); + assert!(cache.current_epoch == 2); + + cache.put(&[(&key3, executor.clone())]); + assert!(cache.get(&key3).is_some()); + } + + #[test] + fn test_cached_executors_evicts_smallest() { + let key1 = solana_sdk::pubkey::new_rand(); + let key2 = solana_sdk::pubkey::new_rand(); + let key3 = solana_sdk::pubkey::new_rand(); + let executor: Arc = Arc::new(TestExecutor {}); + let mut cache = CachedExecutors::new(2, 0); + + cache.put(&[(&key1, executor.clone())]); + for _ in 0..5 { + let _ = cache.get(&key1); + } + cache.put(&[(&key2, executor.clone())]); + // make key1's use-count for sure greater than key2's + let _ = cache.get(&key1); + + let mut entries = cache + .executors + .iter() + .map(|(k, v)| (*k, v.epoch_count.load(Relaxed))) + .collect::>(); + entries.sort_by_key(|(_, v)| *v); + assert!(entries[0].1 < entries[1].1); + + cache.put(&[(&key3, executor.clone())]); + assert!(cache.get(&entries[0].0).is_none()); + assert!(cache.get(&entries[1].0).is_some()); + } + + #[test] + fn test_cached_executors_one_hit_wonder_counter() { + let mut cache = CachedExecutors::new(1, 0); + + let one_hit_wonder = Pubkey::new_unique(); + let popular = Pubkey::new_unique(); + let executor: Arc = Arc::new(TestExecutor {}); + + // make sure we're starting from where we think we are + assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 0); + + // add our one-hit-wonder + cache.put(&[(&one_hit_wonder, executor.clone())]); + assert_eq!(cache.executors[&one_hit_wonder].hit_count.load(Relaxed), 1); + // displace the one-hit-wonder with "popular program" + cache.put(&[(&popular, executor.clone())]); + assert_eq!(cache.executors[&popular].hit_count.load(Relaxed), 1); + + // one-hit-wonder counter incremented + assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 1); + + // make "popular program" popular + cache.get(&popular).unwrap(); + assert_eq!(cache.executors[&popular].hit_count.load(Relaxed), 2); + + // evict "popular program" + cache.put(&[(&one_hit_wonder, executor.clone())]); + assert_eq!(cache.executors[&one_hit_wonder].hit_count.load(Relaxed), 1); + + // one-hit-wonder counter not incremented + assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 1); + } + + #[test] + fn test_executor_cache_get_primer_count_upper_bound_inclusive() { + let pubkey = Pubkey::default(); + let v = []; + assert_eq!( + CachedExecutors::get_primer_count_upper_bound_inclusive(&v), + 0 + ); + let v = [(&pubkey, 1)]; + assert_eq!( + CachedExecutors::get_primer_count_upper_bound_inclusive(&v), + 1 + ); + let v = (0u64..10).map(|i| (&pubkey, i)).collect::>(); + assert_eq!( + CachedExecutors::get_primer_count_upper_bound_inclusive(v.as_slice()), + 7 + ); + } + + #[test] + fn test_cached_executors_stats() { + #[derive(Debug, Default, PartialEq)] + struct ComparableStats { + hits: u64, + misses: u64, + evictions: HashMap, + insertions: u64, + replacements: u64, + one_hit_wonders: u64, + } + impl From<&Stats> for ComparableStats { + fn from(stats: &Stats) -> Self { + let Stats { + hits, + misses, + evictions, + insertions, + replacements, + one_hit_wonders, + } = stats; + ComparableStats { + hits: hits.load(Relaxed), + misses: misses.load(Relaxed), + evictions: evictions.clone(), + insertions: insertions.load(Relaxed), + replacements: replacements.load(Relaxed), + one_hit_wonders: one_hit_wonders.load(Relaxed), + } + } + } + + const CURRENT_EPOCH: Epoch = 0; + let mut cache = CachedExecutors::new(2, CURRENT_EPOCH); + let mut expected_stats = ComparableStats::default(); + + let program_id1 = Pubkey::new_unique(); + let program_id2 = Pubkey::new_unique(); + let executor: Arc = Arc::new(TestExecutor {}); + + // make sure we're starting from where we think we are + assert_eq!(ComparableStats::from(&cache.stats), expected_stats,); + + // insert some executors + cache.put(&[(&program_id1, executor.clone())]); + cache.put(&[(&program_id2, executor.clone())]); + expected_stats.insertions += 2; + assert_eq!(ComparableStats::from(&cache.stats), expected_stats); + + // replace a one-hit-wonder executor + cache.put(&[(&program_id1, executor.clone())]); + expected_stats.replacements += 1; + expected_stats.one_hit_wonders += 1; + assert_eq!(ComparableStats::from(&cache.stats), expected_stats); + + // hit some executors + cache.get(&program_id1); + cache.get(&program_id1); + cache.get(&program_id2); + expected_stats.hits += 3; + assert_eq!(ComparableStats::from(&cache.stats), expected_stats); + + // miss an executor + cache.get(&Pubkey::new_unique()); + expected_stats.misses += 1; + assert_eq!(ComparableStats::from(&cache.stats), expected_stats); + + // evict an executor + cache.put(&[(&Pubkey::new_unique(), executor.clone())]); + expected_stats.insertions += 1; + expected_stats.evictions.insert(program_id2, 1); + assert_eq!(ComparableStats::from(&cache.stats), expected_stats); + + // make sure stats are cleared in new_from_parent + assert_eq!( + ComparableStats::from( + &CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH).stats + ), + ComparableStats::default() + ); + assert_eq!( + ComparableStats::from( + &CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH + 1).stats + ), + ComparableStats::default() + ); + } +} diff --git a/program-runtime/src/invoke_context.rs b/program-runtime/src/invoke_context.rs index 5b080f30a..5fabd643b 100644 --- a/program-runtime/src/invoke_context.rs +++ b/program-runtime/src/invoke_context.rs @@ -2,6 +2,7 @@ use { crate::{ accounts_data_meter::AccountsDataMeter, compute_budget::ComputeBudget, + executor_cache::{Executor, Executors, TransactionExecutor}, ic_logger_msg, ic_msg, log_collector::LogCollector, pre_account::PreAccount, @@ -28,7 +29,6 @@ use { alloc::Layout, borrow::Cow, cell::RefCell, - collections::HashMap, fmt::{self, Debug}, rc::Rc, sync::Arc, @@ -58,78 +58,6 @@ impl std::fmt::Debug for BuiltinProgram { } } -/// Program executor -pub trait Executor: Debug + Send + Sync { - /// Execute the program - fn execute( - &self, - first_instruction_account: IndexOfAccount, - invoke_context: &mut InvokeContext, - ) -> Result<(), InstructionError>; -} - -pub type Executors = HashMap; - -#[repr(u8)] -#[derive(PartialEq, Debug)] -enum TransactionExecutorStatus { - /// Executor was already in the cache, no update needed - Cached, - /// Executor was missing from the cache, but not updated - Missing, - /// Executor is for an updated program - Updated, -} - -/// Tracks whether a given executor is "dirty" and needs to updated in the -/// executors cache -#[derive(Debug)] -pub struct TransactionExecutor { - executor: Arc, - status: TransactionExecutorStatus, -} - -impl TransactionExecutor { - /// Wraps an executor and tracks that it doesn't need to be updated in the - /// executors cache. - pub fn new_cached(executor: Arc) -> Self { - Self { - executor, - status: TransactionExecutorStatus::Cached, - } - } - - /// Wraps an executor and tracks that it needs to be updated in the - /// executors cache. - pub fn new_miss(executor: Arc) -> Self { - Self { - executor, - status: TransactionExecutorStatus::Missing, - } - } - - /// Wraps an executor and tracks that it needs to be updated in the - /// executors cache only if the transaction succeeded. - pub fn new_updated(executor: Arc) -> Self { - Self { - executor, - status: TransactionExecutorStatus::Updated, - } - } - - pub fn is_missing(&self) -> bool { - self.status == TransactionExecutorStatus::Missing - } - - pub fn is_updated(&self) -> bool { - self.status == TransactionExecutorStatus::Updated - } - - pub fn get(&self) -> Arc { - self.executor.clone() - } -} - /// Compute meter pub struct ComputeMeter { remaining: u64, diff --git a/program-runtime/src/lib.rs b/program-runtime/src/lib.rs index 2a9be8d8c..88bfe9583 100644 --- a/program-runtime/src/lib.rs +++ b/program-runtime/src/lib.rs @@ -11,6 +11,7 @@ extern crate solana_metrics; pub mod accounts_data_meter; pub mod compute_budget; +pub mod executor_cache; pub mod invoke_context; pub mod log_collector; pub mod pre_account; diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 576bfb868..0bad7e57f 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -5245,6 +5245,7 @@ dependencies = [ "log", "num-derive", "num-traits", + "rand 0.7.3", "rustc_version", "serde", "solana-frozen-abi 1.15.0", diff --git a/programs/bpf_loader/src/lib.rs b/programs/bpf_loader/src/lib.rs index b97263823..b828b5814 100644 --- a/programs/bpf_loader/src/lib.rs +++ b/programs/bpf_loader/src/lib.rs @@ -21,8 +21,9 @@ use { log::{log_enabled, trace, Level::Trace}, solana_measure::measure::Measure, solana_program_runtime::{ + executor_cache::Executor, ic_logger_msg, ic_msg, - invoke_context::{ComputeMeter, Executor, InvokeContext}, + invoke_context::{ComputeMeter, InvokeContext}, log_collector::LogCollector, stable_log, sysvar_cache::get_sysvar_with_account_check, diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 376631d85..91aebe47f 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1427,7 +1427,7 @@ mod tests { }, assert_matches::assert_matches, solana_address_lookup_table_program::state::LookupTableMeta, - solana_program_runtime::invoke_context::Executors, + solana_program_runtime::executor_cache::Executors, solana_sdk::{ account::{AccountSharedData, WritableAccount}, epoch_schedule::EpochSchedule, diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e729f7dad..760639aa5 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -78,7 +78,6 @@ use { dashmap::{DashMap, DashSet}, itertools::Itertools, log::*, - rand::Rng, rayon::{ iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}, ThreadPool, ThreadPoolBuilder, @@ -88,9 +87,8 @@ use { solana_program_runtime::{ accounts_data_meter::MAX_ACCOUNTS_DATA_LEN, compute_budget::{self, ComputeBudget}, - invoke_context::{ - BuiltinProgram, Executor, Executors, ProcessInstructionWithContext, TransactionExecutor, - }, + executor_cache::{CachedExecutors, Executors, TransactionExecutor, MAX_CACHED_EXECUTORS}, + invoke_context::{BuiltinProgram, ProcessInstructionWithContext}, log_collector::LogCollector, sysvar_cache::SysvarCache, timings::{ExecuteTimingType, ExecuteTimings}, @@ -157,7 +155,7 @@ use { collections::{HashMap, HashSet}, convert::{TryFrom, TryInto}, fmt, mem, - ops::{Deref, Div, RangeInclusive}, + ops::{Deref, RangeInclusive}, path::PathBuf, rc::Rc, sync::{ @@ -313,272 +311,6 @@ pub struct SquashTiming { type EpochCount = u64; -mod executor_cache { - use {super::*, log}; - - #[derive(Debug, Default)] - pub struct Stats { - pub hits: AtomicU64, - pub misses: AtomicU64, - pub evictions: HashMap, - pub insertions: AtomicU64, - pub replacements: AtomicU64, - pub one_hit_wonders: AtomicU64, - } - - impl Stats { - pub fn submit(&self, slot: Slot) { - let hits = self.hits.load(Relaxed); - let misses = self.misses.load(Relaxed); - let insertions = self.insertions.load(Relaxed); - let replacements = self.replacements.load(Relaxed); - let one_hit_wonders = self.one_hit_wonders.load(Relaxed); - let evictions: u64 = self.evictions.values().sum(); - datapoint_info!( - "bank-executor-cache-stats", - ("slot", slot, i64), - ("hits", hits, i64), - ("misses", misses, i64), - ("evictions", evictions, i64), - ("insertions", insertions, i64), - ("replacements", replacements, i64), - ("one_hit_wonders", one_hit_wonders, i64), - ); - debug!( - "Executor Cache Stats -- Hits: {}, Misses: {}, Evictions: {}, Insertions: {}, Replacements: {}, One-Hit-Wonders: {}", - hits, misses, evictions, insertions, replacements, one_hit_wonders, - ); - if log_enabled!(log::Level::Trace) && !self.evictions.is_empty() { - let mut evictions = self.evictions.iter().collect::>(); - evictions.sort_by_key(|e| e.1); - let evictions = evictions - .into_iter() - .rev() - .map(|(program_id, evictions)| { - format!(" {:<44} {}", program_id.to_string(), evictions) - }) - .collect::>(); - let evictions = evictions.join("\n"); - trace!( - "Eviction Details:\n {:<44} {}\n{}", - "Program", - "Count", - evictions - ); - } - } - } -} - -const MAX_CACHED_EXECUTORS: usize = 256; -#[derive(Debug)] -struct CachedExecutorsEntry { - prev_epoch_count: u64, - epoch_count: AtomicU64, - executor: Arc, - hit_count: AtomicU64, -} - -impl Clone for CachedExecutorsEntry { - fn clone(&self) -> Self { - Self { - prev_epoch_count: self.prev_epoch_count, - epoch_count: AtomicU64::new(self.epoch_count.load(Relaxed)), - executor: self.executor.clone(), - hit_count: AtomicU64::new(self.hit_count.load(Relaxed)), - } - } -} - -/// LFU Cache of executors with single-epoch memory of usage counts -#[derive(Debug)] -struct CachedExecutors { - capacity: usize, - current_epoch: Epoch, - pub(self) executors: HashMap, - stats: executor_cache::Stats, -} - -impl Default for CachedExecutors { - fn default() -> Self { - Self { - capacity: MAX_CACHED_EXECUTORS, - current_epoch: Epoch::default(), - executors: HashMap::default(), - stats: executor_cache::Stats::default(), - } - } -} - -#[cfg(RUSTC_WITH_SPECIALIZATION)] -impl AbiExample for CachedExecutors { - fn example() -> Self { - // Delegate AbiExample impl to Default before going deep and stuck with - // not easily impl-able Arc due to rust's coherence issue - // This is safe because CachedExecutors isn't serializable by definition. - Self::default() - } -} - -impl CachedExecutors { - fn new(max_capacity: usize, current_epoch: Epoch) -> Self { - Self { - capacity: max_capacity, - current_epoch, - executors: HashMap::new(), - stats: executor_cache::Stats::default(), - } - } - - fn new_from_parent_bank_executors( - parent_bank_executors: &CachedExecutors, - current_epoch: Epoch, - ) -> Self { - let executors = if parent_bank_executors.current_epoch == current_epoch { - parent_bank_executors.executors.clone() - } else { - parent_bank_executors - .executors - .iter() - .map(|(&key, entry)| { - let entry = CachedExecutorsEntry { - prev_epoch_count: entry.epoch_count.load(Relaxed), - epoch_count: AtomicU64::default(), - executor: entry.executor.clone(), - hit_count: AtomicU64::new(entry.hit_count.load(Relaxed)), - }; - (key, entry) - }) - .collect() - }; - - Self { - capacity: parent_bank_executors.capacity, - current_epoch, - executors, - stats: executor_cache::Stats::default(), - } - } - - fn get(&self, pubkey: &Pubkey) -> Option> { - if let Some(entry) = self.executors.get(pubkey) { - self.stats.hits.fetch_add(1, Relaxed); - entry.epoch_count.fetch_add(1, Relaxed); - entry.hit_count.fetch_add(1, Relaxed); - Some(entry.executor.clone()) - } else { - self.stats.misses.fetch_add(1, Relaxed); - None - } - } - - fn put(&mut self, executors: &[(&Pubkey, Arc)]) { - let mut new_executors: Vec<_> = executors - .iter() - .filter_map(|(key, executor)| { - if let Some(mut entry) = self.remove(key) { - self.stats.replacements.fetch_add(1, Relaxed); - entry.executor = executor.clone(); - let _ = self.executors.insert(**key, entry); - None - } else { - self.stats.insertions.fetch_add(1, Relaxed); - Some((*key, executor)) - } - }) - .collect(); - - if !new_executors.is_empty() { - let mut counts = self - .executors - .iter() - .map(|(key, entry)| { - let count = entry.prev_epoch_count + entry.epoch_count.load(Relaxed); - (key, count) - }) - .collect::>(); - counts.sort_unstable_by_key(|(_, count)| *count); - - let primer_counts = Self::get_primer_counts(counts.as_slice(), new_executors.len()); - - if self.executors.len() >= self.capacity { - let mut least_keys = counts - .iter() - .take(new_executors.len()) - .map(|least| *least.0) - .collect::>(); - for least_key in least_keys.drain(..) { - let _ = self.remove(&least_key); - self.stats - .evictions - .entry(least_key) - .and_modify(|c| saturating_add_assign!(*c, 1)) - .or_insert(1); - } - } - - for ((key, executor), primer_count) in new_executors.drain(..).zip(primer_counts) { - let entry = CachedExecutorsEntry { - prev_epoch_count: 0, - epoch_count: AtomicU64::new(primer_count), - executor: executor.clone(), - hit_count: AtomicU64::new(1), - }; - let _ = self.executors.insert(*key, entry); - } - } - } - - fn remove(&mut self, pubkey: &Pubkey) -> Option { - let maybe_entry = self.executors.remove(pubkey); - if let Some(entry) = maybe_entry.as_ref() { - if entry.hit_count.load(Relaxed) == 1 { - self.stats.one_hit_wonders.fetch_add(1, Relaxed); - } - } - maybe_entry - } - - fn clear(&mut self) { - *self = CachedExecutors::default(); - } - - fn get_primer_count_upper_bound_inclusive(counts: &[(&Pubkey, u64)]) -> u64 { - const PRIMER_COUNT_TARGET_PERCENTILE: u64 = 85; - #[allow(clippy::assertions_on_constants)] - { - assert!(PRIMER_COUNT_TARGET_PERCENTILE <= 100); - } - // Executor use-frequencies are assumed to fit a Pareto distribution. Choose an - // upper-bound for our primer count as the actual count at the target rank to avoid - // an upward bias - - let target_index = u64::try_from(counts.len().saturating_sub(1)) - .ok() - .and_then(|counts| { - let index = counts - .saturating_mul(PRIMER_COUNT_TARGET_PERCENTILE) - .div(100); // switch to u64::saturating_div once stable - usize::try_from(index).ok() - }) - .unwrap_or(0); - - counts - .get(target_index) - .map(|(_, count)| *count) - .unwrap_or(0) - } - - fn get_primer_counts(counts: &[(&Pubkey, u64)], num_counts: usize) -> Vec { - let max_primer_count = Self::get_primer_count_upper_bound_inclusive(counts); - let mut rng = rand::thread_rng(); - - (0..num_counts) - .map(|_| rng.gen_range(0, max_primer_count.saturating_add(1))) - .collect::>() - } -} - #[derive(Debug)] pub struct BankRc { /// where all the Accounts are stored @@ -8136,8 +7868,10 @@ pub(crate) mod tests { status_cache::MAX_CACHE_ENTRIES, }, crossbeam_channel::{bounded, unbounded}, + rand::Rng, solana_program_runtime::{ compute_budget::MAX_COMPUTE_UNIT_LIMIT, + executor_cache::Executor, invoke_context::InvokeContext, prioritization_fee::{PrioritizationFeeDetails, PrioritizationFeeType}, }, @@ -15337,244 +15071,6 @@ pub(crate) mod tests { } } - #[test] - fn test_cached_executors() { - let key1 = solana_sdk::pubkey::new_rand(); - let key2 = solana_sdk::pubkey::new_rand(); - let key3 = solana_sdk::pubkey::new_rand(); - let key4 = solana_sdk::pubkey::new_rand(); - let executor: Arc = Arc::new(TestExecutor {}); - let mut cache = CachedExecutors::new(3, 0); - - cache.put(&[(&key1, executor.clone())]); - cache.put(&[(&key2, executor.clone())]); - cache.put(&[(&key3, executor.clone())]); - assert!(cache.get(&key1).is_some()); - assert!(cache.get(&key2).is_some()); - assert!(cache.get(&key3).is_some()); - - assert!(cache.get(&key1).is_some()); - assert!(cache.get(&key1).is_some()); - assert!(cache.get(&key2).is_some()); - cache.put(&[(&key4, executor.clone())]); - assert!(cache.get(&key4).is_some()); - let num_retained = [&key1, &key2, &key3] - .iter() - .filter_map(|key| cache.get(key)) - .count(); - assert_eq!(num_retained, 2); - - assert!(cache.get(&key4).is_some()); - assert!(cache.get(&key4).is_some()); - assert!(cache.get(&key4).is_some()); - cache.put(&[(&key3, executor.clone())]); - assert!(cache.get(&key3).is_some()); - let num_retained = [&key1, &key2, &key4] - .iter() - .filter_map(|key| cache.get(key)) - .count(); - assert_eq!(num_retained, 2); - } - - #[test] - fn test_cached_executor_eviction() { - let key1 = solana_sdk::pubkey::new_rand(); - let key2 = solana_sdk::pubkey::new_rand(); - let key3 = solana_sdk::pubkey::new_rand(); - let key4 = solana_sdk::pubkey::new_rand(); - let executor: Arc = Arc::new(TestExecutor {}); - let mut cache = CachedExecutors::new(3, 0); - assert!(cache.current_epoch == 0); - - cache.put(&[(&key1, executor.clone())]); - cache.put(&[(&key2, executor.clone())]); - cache.put(&[(&key3, executor.clone())]); - assert!(cache.get(&key1).is_some()); - assert!(cache.get(&key1).is_some()); - assert!(cache.get(&key1).is_some()); - - let mut cache = CachedExecutors::new_from_parent_bank_executors(&cache, 1); - assert!(cache.current_epoch == 1); - - assert!(cache.get(&key2).is_some()); - assert!(cache.get(&key2).is_some()); - assert!(cache.get(&key3).is_some()); - cache.put(&[(&key4, executor.clone())]); - - assert!(cache.get(&key4).is_some()); - let num_retained = [&key1, &key2, &key3] - .iter() - .filter_map(|key| cache.get(key)) - .count(); - assert_eq!(num_retained, 2); - - cache.put(&[(&key1, executor.clone())]); - cache.put(&[(&key3, executor.clone())]); - assert!(cache.get(&key1).is_some()); - assert!(cache.get(&key3).is_some()); - let num_retained = [&key2, &key4] - .iter() - .filter_map(|key| cache.get(key)) - .count(); - assert_eq!(num_retained, 1); - - cache = CachedExecutors::new_from_parent_bank_executors(&cache, 2); - assert!(cache.current_epoch == 2); - - cache.put(&[(&key3, executor.clone())]); - assert!(cache.get(&key3).is_some()); - } - - #[test] - fn test_cached_executors_evicts_smallest() { - let key1 = solana_sdk::pubkey::new_rand(); - let key2 = solana_sdk::pubkey::new_rand(); - let key3 = solana_sdk::pubkey::new_rand(); - let executor: Arc = Arc::new(TestExecutor {}); - let mut cache = CachedExecutors::new(2, 0); - - cache.put(&[(&key1, executor.clone())]); - for _ in 0..5 { - let _ = cache.get(&key1); - } - cache.put(&[(&key2, executor.clone())]); - // make key1's use-count for sure greater than key2's - let _ = cache.get(&key1); - - let mut entries = cache - .executors - .iter() - .map(|(k, v)| (*k, v.epoch_count.load(Relaxed))) - .collect::>(); - entries.sort_by_key(|(_, v)| *v); - assert!(entries[0].1 < entries[1].1); - - cache.put(&[(&key3, executor.clone())]); - assert!(cache.get(&entries[0].0).is_none()); - assert!(cache.get(&entries[1].0).is_some()); - } - - #[test] - fn test_cached_executors_one_hit_wonder_counter() { - let mut cache = CachedExecutors::new(1, 0); - - let one_hit_wonder = Pubkey::new_unique(); - let popular = Pubkey::new_unique(); - let executor: Arc = Arc::new(TestExecutor {}); - - // make sure we're starting from where we think we are - assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 0); - - // add our one-hit-wonder - cache.put(&[(&one_hit_wonder, executor.clone())]); - assert_eq!(cache.executors[&one_hit_wonder].hit_count.load(Relaxed), 1); - // displace the one-hit-wonder with "popular program" - cache.put(&[(&popular, executor.clone())]); - assert_eq!(cache.executors[&popular].hit_count.load(Relaxed), 1); - - // one-hit-wonder counter incremented - assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 1); - - // make "popular program" popular - cache.get(&popular).unwrap(); - assert_eq!(cache.executors[&popular].hit_count.load(Relaxed), 2); - - // evict "popular program" - cache.put(&[(&one_hit_wonder, executor.clone())]); - assert_eq!(cache.executors[&one_hit_wonder].hit_count.load(Relaxed), 1); - - // one-hit-wonder counter not incremented - assert_eq!(cache.stats.one_hit_wonders.load(Relaxed), 1); - } - - #[test] - fn test_cached_executors_stats() { - #[derive(Debug, Default, PartialEq)] - struct ComparableStats { - hits: u64, - misses: u64, - evictions: HashMap, - insertions: u64, - replacements: u64, - one_hit_wonders: u64, - } - impl From<&executor_cache::Stats> for ComparableStats { - fn from(stats: &executor_cache::Stats) -> Self { - let executor_cache::Stats { - hits, - misses, - evictions, - insertions, - replacements, - one_hit_wonders, - } = stats; - ComparableStats { - hits: hits.load(Relaxed), - misses: misses.load(Relaxed), - evictions: evictions.clone(), - insertions: insertions.load(Relaxed), - replacements: replacements.load(Relaxed), - one_hit_wonders: one_hit_wonders.load(Relaxed), - } - } - } - - const CURRENT_EPOCH: Epoch = 0; - let mut cache = CachedExecutors::new(2, CURRENT_EPOCH); - let mut expected_stats = ComparableStats::default(); - - let program_id1 = Pubkey::new_unique(); - let program_id2 = Pubkey::new_unique(); - let executor: Arc = Arc::new(TestExecutor {}); - - // make sure we're starting from where we think we are - assert_eq!(ComparableStats::from(&cache.stats), expected_stats,); - - // insert some executors - cache.put(&[(&program_id1, executor.clone())]); - cache.put(&[(&program_id2, executor.clone())]); - expected_stats.insertions += 2; - assert_eq!(ComparableStats::from(&cache.stats), expected_stats); - - // replace a one-hit-wonder executor - cache.put(&[(&program_id1, executor.clone())]); - expected_stats.replacements += 1; - expected_stats.one_hit_wonders += 1; - assert_eq!(ComparableStats::from(&cache.stats), expected_stats); - - // hit some executors - cache.get(&program_id1); - cache.get(&program_id1); - cache.get(&program_id2); - expected_stats.hits += 3; - assert_eq!(ComparableStats::from(&cache.stats), expected_stats); - - // miss an executor - cache.get(&Pubkey::new_unique()); - expected_stats.misses += 1; - assert_eq!(ComparableStats::from(&cache.stats), expected_stats); - - // evict an executor - cache.put(&[(&Pubkey::new_unique(), executor.clone())]); - expected_stats.insertions += 1; - expected_stats.evictions.insert(program_id2, 1); - assert_eq!(ComparableStats::from(&cache.stats), expected_stats); - - // make sure stats are cleared in new_from_parent - assert_eq!( - ComparableStats::from( - &CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH).stats - ), - ComparableStats::default() - ); - assert_eq!( - ComparableStats::from( - &CachedExecutors::new_from_parent_bank_executors(&cache, CURRENT_EPOCH + 1).stats - ), - ComparableStats::default() - ); - } - #[test] fn test_bank_executor_cache() { solana_logger::setup(); @@ -18129,26 +17625,6 @@ pub(crate) mod tests { ); } - #[test] - fn test_executor_cache_get_primer_count_upper_bound_inclusive() { - let pubkey = Pubkey::default(); - let v = []; - assert_eq!( - CachedExecutors::get_primer_count_upper_bound_inclusive(&v), - 0 - ); - let v = [(&pubkey, 1)]; - assert_eq!( - CachedExecutors::get_primer_count_upper_bound_inclusive(&v), - 1 - ); - let v = (0u64..10).map(|i| (&pubkey, i)).collect::>(); - assert_eq!( - CachedExecutors::get_primer_count_upper_bound_inclusive(v.as_slice()), - 7 - ); - } - #[derive(Serialize, Deserialize)] enum MockTransferInstruction { Transfer(u64), diff --git a/runtime/src/message_processor.rs b/runtime/src/message_processor.rs index 68507c96a..ac8639c5c 100644 --- a/runtime/src/message_processor.rs +++ b/runtime/src/message_processor.rs @@ -3,7 +3,8 @@ use { solana_measure::measure::Measure, solana_program_runtime::{ compute_budget::ComputeBudget, - invoke_context::{BuiltinProgram, Executors, InvokeContext}, + executor_cache::Executors, + invoke_context::{BuiltinProgram, InvokeContext}, log_collector::LogCollector, sysvar_cache::SysvarCache, timings::{ExecuteDetailsTimings, ExecuteTimings},