From 965f6534712c2cd3414b6f96c13fa37ee3edab45 Mon Sep 17 00:00:00 2001 From: Jack May Date: Sat, 26 Sep 2020 10:54:11 -0700 Subject: [PATCH] Add copy-on-write executor cache (#12502) * Add copy-on-write executor cache * Add remove_executor function to the bank --- runtime/src/bank.rs | 143 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 133 insertions(+), 10 deletions(-) diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d5ab818f2f..b20d38f1d8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -76,8 +76,10 @@ use std::{ path::PathBuf, ptr, rc::Rc, - sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed}, - sync::{Arc, RwLock, RwLockReadGuard}, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering::Relaxed}, + LockResult, RwLockWriteGuard, {Arc, RwLock, RwLockReadGuard}, + }, }; // Partial SPL Token v2.0.x declarations inlined to avoid an external dependency on the spl-token crate @@ -150,6 +152,40 @@ impl Builtin { } } +/// Copy-on-write holder of CachedExecutors +#[derive(AbiExample, Default)] +struct CowCachedExecutors { + shared: bool, + executors: Arc>, +} +impl Clone for CowCachedExecutors { + fn clone(&self) -> Self { + Self { + shared: true, + executors: self.executors.clone(), + } + } +} +impl CowCachedExecutors { + fn new(executors: Arc>) -> Self { + Self { + shared: true, + executors, + } + } + fn read(&self) -> LockResult> { + self.executors.read() + } + fn write(&mut self) -> LockResult> { + if self.shared { + self.shared = false; + let local_cache = (*self.executors.read().unwrap()).clone(); + self.executors = Arc::new(RwLock::new(local_cache)); + } + self.executors.write() + } +} + const MAX_CACHED_EXECUTORS: usize = 100; // 10 MB assuming programs are around 100k /// LFU Cache of executors @@ -166,20 +202,35 @@ impl Default for CachedExecutors { } } } +impl Clone for CachedExecutors { + fn clone(&self) -> Self { + let mut executors = HashMap::new(); + for (key, (count, executor)) in self.executors.iter() { + executors.insert( + *key, + (AtomicU64::new(count.load(Relaxed)), executor.clone()), + ); + } + Self { + max: self.max, + executors, + } + } +} impl CachedExecutors { - pub fn new(max: usize) -> Self { + fn new(max: usize) -> Self { Self { max, executors: HashMap::new(), } } - pub fn get(&self, pubkey: &Pubkey) -> Option> { + fn get(&self, pubkey: &Pubkey) -> Option> { self.executors.get(pubkey).map(|(count, executor)| { count.fetch_add(1, Relaxed); executor.clone() }) } - pub fn put(&mut self, pubkey: &Pubkey, executor: Arc) { + fn put(&mut self, pubkey: &Pubkey, executor: Arc) { if !self.executors.contains_key(pubkey) { if self.executors.len() >= self.max { let mut least = u64::MAX; @@ -200,6 +251,9 @@ impl CachedExecutors { .insert(*pubkey, (AtomicU64::new(0), executor)); } } + fn remove(&mut self, pubkey: &Pubkey) { + let _ = self.executors.remove(pubkey); + } } #[derive(Default)] @@ -525,7 +579,7 @@ pub struct Bank { pub rewards_pool_pubkeys: Arc>, /// Cached executors - cached_executors: Arc>, + cached_executors: RwLock, transaction_debug_keys: Option>>, @@ -651,7 +705,7 @@ impl Bank { cluster_type: parent.cluster_type, lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)), rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(), - cached_executors: parent.cached_executors.clone(), + cached_executors: RwLock::new((*parent.cached_executors.read().unwrap()).clone()), transaction_debug_keys: parent.transaction_debug_keys.clone(), feature_set: parent.feature_set.clone(), }; @@ -756,7 +810,9 @@ impl Bank { cluster_type: Some(genesis_config.cluster_type), lazy_rent_collection: new(), rewards_pool_pubkeys: new(), - cached_executors: Arc::new(RwLock::new(CachedExecutors::new(MAX_CACHED_EXECUTORS))), + cached_executors: RwLock::new(CowCachedExecutors::new(Arc::new(RwLock::new( + CachedExecutors::new(MAX_CACHED_EXECUTORS), + )))), transaction_debug_keys: debug_keys, feature_set: new(), }; @@ -1936,7 +1992,8 @@ impl Bank { num_executors += instruction_loaders.len(); } let mut executors = HashMap::with_capacity(num_executors); - let cache = self.cached_executors.read().unwrap(); + let cow_cache = self.cached_executors.read().unwrap(); + let cache = cow_cache.read().unwrap(); for key in message.account_keys.iter() { if let Some(executor) = cache.get(key) { @@ -1961,13 +2018,21 @@ impl Bank { fn update_executors(&self, executors: Rc>) { let executors = executors.borrow(); if executors.is_dirty { - let mut cache = self.cached_executors.write().unwrap(); + let mut cow_cache = self.cached_executors.write().unwrap(); + let mut cache = cow_cache.write().unwrap(); for (key, executor) in executors.executors.iter() { cache.put(key, (*executor).clone()); } } } + /// Remove an executor from the bank's cache + pub fn remove_executor(&self, pubkey: &Pubkey) { + let mut cow_cache = self.cached_executors.write().unwrap(); + let mut cache = cow_cache.write().unwrap(); + cache.remove(pubkey); + } + #[allow(clippy::type_complexity)] pub fn load_and_execute_transactions( &self, @@ -9011,6 +9076,64 @@ mod tests { assert!(executors.borrow().executors.contains_key(&key2)); assert!(executors.borrow().executors.contains_key(&key3)); assert!(executors.borrow().executors.contains_key(&key4)); + + bank.remove_executor(&key1); + bank.remove_executor(&key2); + bank.remove_executor(&key3); + bank.remove_executor(&key4); + let executors = bank.get_executors(&message, loaders); + assert_eq!(executors.borrow().executors.len(), 0); + assert!(!executors.borrow().executors.contains_key(&key1)); + assert!(!executors.borrow().executors.contains_key(&key2)); + assert!(!executors.borrow().executors.contains_key(&key3)); + assert!(!executors.borrow().executors.contains_key(&key4)); + } + + #[test] + fn test_bank_executor_cow() { + solana_logger::setup(); + + let (genesis_config, _) = create_genesis_config(1); + let root = Arc::new(Bank::new(&genesis_config)); + + let key1 = Pubkey::new_rand(); + let key2 = Pubkey::new_rand(); + let executor: Arc = Arc::new(TestExecutor {}); + + let loaders = &[vec![(key1, Account::default()), (key2, Account::default())]]; + + // add one to root bank + let mut executors = Executors::default(); + executors.insert(key1, executor.clone()); + let executors = Rc::new(RefCell::new(executors)); + root.update_executors(executors); + let executors = root.get_executors(&Message::default(), loaders); + assert_eq!(executors.borrow().executors.len(), 1); + + let fork1 = Bank::new_from_parent(&root, &Pubkey::default(), 1); + let fork2 = Bank::new_from_parent(&root, &Pubkey::default(), 1); + + let executors = fork1.get_executors(&Message::default(), loaders); + assert_eq!(executors.borrow().executors.len(), 1); + let executors = fork2.get_executors(&Message::default(), loaders); + assert_eq!(executors.borrow().executors.len(), 1); + + let mut executors = Executors::default(); + executors.insert(key2, executor.clone()); + let executors = Rc::new(RefCell::new(executors)); + fork1.update_executors(executors); + + let executors = fork1.get_executors(&Message::default(), loaders); + assert_eq!(executors.borrow().executors.len(), 2); + let executors = fork2.get_executors(&Message::default(), loaders); + assert_eq!(executors.borrow().executors.len(), 1); + + fork1.remove_executor(&key1); + + let executors = fork1.get_executors(&Message::default(), loaders); + assert_eq!(executors.borrow().executors.len(), 1); + let executors = fork2.get_executors(&Message::default(), loaders); + assert_eq!(executors.borrow().executors.len(), 1); } #[test]