Add copy-on-write executor cache (#12502)

* Add copy-on-write executor cache

* Add remove_executor function to the bank
This commit is contained in:
Jack May 2020-09-26 10:54:11 -07:00 committed by GitHub
parent a36252bfa0
commit 965f653471
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 133 additions and 10 deletions

View File

@ -76,8 +76,10 @@ use std::{
path::PathBuf,
ptr,
rc::Rc,
sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
sync::{Arc, RwLock, RwLockReadGuard},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
LockResult, RwLockWriteGuard, {Arc, RwLock, RwLockReadGuard},
},
};
// Partial SPL Token v2.0.x declarations inlined to avoid an external dependency on the spl-token crate
@ -150,6 +152,40 @@ impl Builtin {
}
}
/// Copy-on-write holder of CachedExecutors
#[derive(AbiExample, Default)]
struct CowCachedExecutors {
shared: bool,
executors: Arc<RwLock<CachedExecutors>>,
}
impl Clone for CowCachedExecutors {
fn clone(&self) -> Self {
Self {
shared: true,
executors: self.executors.clone(),
}
}
}
impl CowCachedExecutors {
fn new(executors: Arc<RwLock<CachedExecutors>>) -> Self {
Self {
shared: true,
executors,
}
}
fn read(&self) -> LockResult<RwLockReadGuard<CachedExecutors>> {
self.executors.read()
}
fn write(&mut self) -> LockResult<RwLockWriteGuard<CachedExecutors>> {
if self.shared {
self.shared = false;
let local_cache = (*self.executors.read().unwrap()).clone();
self.executors = Arc::new(RwLock::new(local_cache));
}
self.executors.write()
}
}
const MAX_CACHED_EXECUTORS: usize = 100; // 10 MB assuming programs are around 100k
/// LFU Cache of executors
@ -166,20 +202,35 @@ impl Default for CachedExecutors {
}
}
}
impl Clone for CachedExecutors {
fn clone(&self) -> Self {
let mut executors = HashMap::new();
for (key, (count, executor)) in self.executors.iter() {
executors.insert(
*key,
(AtomicU64::new(count.load(Relaxed)), executor.clone()),
);
}
Self {
max: self.max,
executors,
}
}
}
impl CachedExecutors {
pub fn new(max: usize) -> Self {
fn new(max: usize) -> Self {
Self {
max,
executors: HashMap::new(),
}
}
pub fn get(&self, pubkey: &Pubkey) -> Option<Arc<dyn Executor>> {
fn get(&self, pubkey: &Pubkey) -> Option<Arc<dyn Executor>> {
self.executors.get(pubkey).map(|(count, executor)| {
count.fetch_add(1, Relaxed);
executor.clone()
})
}
pub fn put(&mut self, pubkey: &Pubkey, executor: Arc<dyn Executor>) {
fn put(&mut self, pubkey: &Pubkey, executor: Arc<dyn Executor>) {
if !self.executors.contains_key(pubkey) {
if self.executors.len() >= self.max {
let mut least = u64::MAX;
@ -200,6 +251,9 @@ impl CachedExecutors {
.insert(*pubkey, (AtomicU64::new(0), executor));
}
}
fn remove(&mut self, pubkey: &Pubkey) {
let _ = self.executors.remove(pubkey);
}
}
#[derive(Default)]
@ -525,7 +579,7 @@ pub struct Bank {
pub rewards_pool_pubkeys: Arc<HashSet<Pubkey>>,
/// Cached executors
cached_executors: Arc<RwLock<CachedExecutors>>,
cached_executors: RwLock<CowCachedExecutors>,
transaction_debug_keys: Option<Arc<HashSet<Pubkey>>>,
@ -651,7 +705,7 @@ impl Bank {
cluster_type: parent.cluster_type,
lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)),
rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(),
cached_executors: parent.cached_executors.clone(),
cached_executors: RwLock::new((*parent.cached_executors.read().unwrap()).clone()),
transaction_debug_keys: parent.transaction_debug_keys.clone(),
feature_set: parent.feature_set.clone(),
};
@ -756,7 +810,9 @@ impl Bank {
cluster_type: Some(genesis_config.cluster_type),
lazy_rent_collection: new(),
rewards_pool_pubkeys: new(),
cached_executors: Arc::new(RwLock::new(CachedExecutors::new(MAX_CACHED_EXECUTORS))),
cached_executors: RwLock::new(CowCachedExecutors::new(Arc::new(RwLock::new(
CachedExecutors::new(MAX_CACHED_EXECUTORS),
)))),
transaction_debug_keys: debug_keys,
feature_set: new(),
};
@ -1936,7 +1992,8 @@ impl Bank {
num_executors += instruction_loaders.len();
}
let mut executors = HashMap::with_capacity(num_executors);
let cache = self.cached_executors.read().unwrap();
let cow_cache = self.cached_executors.read().unwrap();
let cache = cow_cache.read().unwrap();
for key in message.account_keys.iter() {
if let Some(executor) = cache.get(key) {
@ -1961,13 +2018,21 @@ impl Bank {
fn update_executors(&self, executors: Rc<RefCell<Executors>>) {
let executors = executors.borrow();
if executors.is_dirty {
let mut cache = self.cached_executors.write().unwrap();
let mut cow_cache = self.cached_executors.write().unwrap();
let mut cache = cow_cache.write().unwrap();
for (key, executor) in executors.executors.iter() {
cache.put(key, (*executor).clone());
}
}
}
/// Remove an executor from the bank's cache
pub fn remove_executor(&self, pubkey: &Pubkey) {
let mut cow_cache = self.cached_executors.write().unwrap();
let mut cache = cow_cache.write().unwrap();
cache.remove(pubkey);
}
#[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
@ -9011,6 +9076,64 @@ mod tests {
assert!(executors.borrow().executors.contains_key(&key2));
assert!(executors.borrow().executors.contains_key(&key3));
assert!(executors.borrow().executors.contains_key(&key4));
bank.remove_executor(&key1);
bank.remove_executor(&key2);
bank.remove_executor(&key3);
bank.remove_executor(&key4);
let executors = bank.get_executors(&message, loaders);
assert_eq!(executors.borrow().executors.len(), 0);
assert!(!executors.borrow().executors.contains_key(&key1));
assert!(!executors.borrow().executors.contains_key(&key2));
assert!(!executors.borrow().executors.contains_key(&key3));
assert!(!executors.borrow().executors.contains_key(&key4));
}
#[test]
fn test_bank_executor_cow() {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(1);
let root = Arc::new(Bank::new(&genesis_config));
let key1 = Pubkey::new_rand();
let key2 = Pubkey::new_rand();
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let loaders = &[vec![(key1, Account::default()), (key2, Account::default())]];
// add one to root bank
let mut executors = Executors::default();
executors.insert(key1, executor.clone());
let executors = Rc::new(RefCell::new(executors));
root.update_executors(executors);
let executors = root.get_executors(&Message::default(), loaders);
assert_eq!(executors.borrow().executors.len(), 1);
let fork1 = Bank::new_from_parent(&root, &Pubkey::default(), 1);
let fork2 = Bank::new_from_parent(&root, &Pubkey::default(), 1);
let executors = fork1.get_executors(&Message::default(), loaders);
assert_eq!(executors.borrow().executors.len(), 1);
let executors = fork2.get_executors(&Message::default(), loaders);
assert_eq!(executors.borrow().executors.len(), 1);
let mut executors = Executors::default();
executors.insert(key2, executor.clone());
let executors = Rc::new(RefCell::new(executors));
fork1.update_executors(executors);
let executors = fork1.get_executors(&Message::default(), loaders);
assert_eq!(executors.borrow().executors.len(), 2);
let executors = fork2.get_executors(&Message::default(), loaders);
assert_eq!(executors.borrow().executors.len(), 1);
fork1.remove_executor(&key1);
let executors = fork1.get_executors(&Message::default(), loaders);
assert_eq!(executors.borrow().executors.len(), 1);
let executors = fork2.get_executors(&Message::default(), loaders);
assert_eq!(executors.borrow().executors.len(), 1);
}
#[test]