Add Executor Cache Eviction Strategy (#30526)

Co-authored-by: K-anon <IntokuSatori@users.noreply.github.com>
This commit is contained in:
K-anon 2023-03-06 14:07:01 -08:00 committed by GitHub
parent 70c6c7e1f7
commit 36d773810a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 288 additions and 8 deletions

View File

@ -1,5 +1,6 @@
use {
crate::{invoke_context::InvokeContext, timings::ExecuteDetailsTimings},
itertools::Itertools,
solana_measure::measure::Measure,
solana_rbpf::{
elf::Executable,
@ -14,10 +15,15 @@ use {
std::{
collections::HashMap,
fmt::{Debug, Formatter},
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
},
};
const MAX_CACHE_ENTRIES: usize = 100; // TODO: Tune to size
/// Relationship between two fork IDs
#[derive(Copy, Clone, PartialEq)]
pub enum BlockRelation {
@ -317,14 +323,41 @@ impl LoadedPrograms {
}
/// Evicts programs which were used infrequently
pub fn sort_and_evict(&mut self) {
// TODO: Sort programs by their usage_counter
// TODO: Truncate the end of the list
pub fn sort_and_evict(&mut self, max_cache_entries: Option<usize>) {
// Find eviction candidates and sort by their usage counters
let mut num_cache_entries: usize = 0;
let sorted_candidates = self
.entries
.iter()
.filter(|(_key, programs)| {
num_cache_entries = num_cache_entries.saturating_add(programs.len());
programs.len() == 1
})
.sorted_by_cached_key(|(_key, programs)| {
programs
.get(0)
.unwrap()
.usage_counter
.load(Ordering::Relaxed)
})
.map(|(key, _programs)| *key)
.collect::<Vec<Pubkey>>();
// Calculate how many to remove
let num_to_remove = std::cmp::min(
num_cache_entries.saturating_sub(max_cache_entries.unwrap_or(MAX_CACHE_ENTRIES)),
sorted_candidates.len(),
);
// Remove selected entries
if num_to_remove != 0 {
self.remove_entries(sorted_candidates.into_iter().take(num_to_remove))
}
}
/// Removes the entries at the given keys, if they exist
pub fn remove_entries(&mut self, _key: impl Iterator<Item = Pubkey>) {
// TODO: Remove at primary index level
pub fn remove_entries(&mut self, keys: impl Iterator<Item = Pubkey>) {
for k in keys {
self.entries.remove(&k);
}
}
}
@ -340,7 +373,10 @@ mod tests {
std::{
collections::HashMap,
ops::ControlFlow,
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
},
};
@ -358,6 +394,243 @@ mod tests {
cache.assign_program(key, Arc::new(LoadedProgram::new_tombstone(slot)))
}
#[test]
fn test_eviction() {
// Fork graph created for the test
// 0
// / \
// 10 5
// | |
// 20 11
// | | \
// 22 15 25
// | |
// 16 27
let mut fork_graph = TestForkGraphSpecific::default();
fork_graph.insert_fork(&[0, 10, 20, 22]);
fork_graph.insert_fork(&[0, 5, 11, 15, 16]);
fork_graph.insert_fork(&[0, 5, 11, 25, 27]);
let possible_slots: Vec<u64> = vec![0, 5, 10, 11, 15, 16, 20, 22, 25, 27];
let usage_counters: Vec<u64> = vec![43, 10, 1128, 1, 0, 67, 212, 322, 29, 21];
let mut programs = HashMap::<Pubkey, Vec<(u64, u64)>>::new();
let mut num_total_programs: usize = 0;
let mut cache = LoadedPrograms::default();
let program1 = Pubkey::new_unique();
let program1_deployment_slots = vec![0, 10, 20];
let program1_usage_counters = vec![1, 5, 25];
program1_deployment_slots
.iter()
.enumerate()
.for_each(|(i, deployment_slot)| {
cache.replenish(
program1,
new_test_loaded_program_with_usage(
*deployment_slot,
(*deployment_slot) + 2,
AtomicU64::new(*program1_usage_counters.get(i).unwrap_or(&0)),
),
);
num_total_programs += 1;
programs
.entry(program1)
.and_modify(|entries| {
entries.push((
*deployment_slot,
*program1_usage_counters.get(i).unwrap_or(&0),
))
})
.or_insert_with(|| {
Vec::<(u64, u64)>::from([(
*deployment_slot,
*program1_usage_counters.get(i).unwrap_or(&0),
)])
});
});
let program2 = Pubkey::new_unique();
let program2_deployment_slots = vec![5, 11];
let program2_usage_counters = vec![0, 10];
program2_deployment_slots
.iter()
.enumerate()
.for_each(|(i, deployment_slot)| {
cache.replenish(
program2,
new_test_loaded_program_with_usage(
*deployment_slot,
(*deployment_slot) + 2,
AtomicU64::new(*program2_usage_counters.get(i).unwrap_or(&0)),
),
);
num_total_programs += 1;
programs
.entry(program2)
.and_modify(|entries| {
entries.push((
*deployment_slot,
*program2_usage_counters.get(i).unwrap_or(&0),
))
})
.or_insert_with(|| {
Vec::<(u64, u64)>::from([(
*deployment_slot,
*program2_usage_counters.get(i).unwrap_or(&0),
)])
});
});
let program3 = Pubkey::new_unique();
let program3_deployment_slots = vec![0, 5, 15];
let program3_usage_counters = vec![100, 3, 20];
program3_deployment_slots
.iter()
.enumerate()
.for_each(|(i, deployment_slot)| {
cache.replenish(
program3,
new_test_loaded_program_with_usage(
*deployment_slot,
(*deployment_slot) + 2,
AtomicU64::new(*program3_usage_counters.get(i).unwrap_or(&0)),
),
);
num_total_programs += 1;
programs
.entry(program3)
.and_modify(|entries| {
entries.push((
*deployment_slot,
*program3_usage_counters.get(i).unwrap_or(&0),
))
})
.or_insert_with(|| {
Vec::<(u64, u64)>::from([(
*deployment_slot,
*program3_usage_counters.get(i).unwrap_or(&0),
)])
});
});
// Add random set of used programs (with no redeploys) on each possible slot
// in the fork graph
let mut eviction_candidates = possible_slots
.into_iter()
.enumerate()
.map(|(i, slot)| {
(
Pubkey::new_unique(),
slot,
*usage_counters.get(i).unwrap_or(&0),
)
})
.collect::<Vec<_>>();
eviction_candidates
.iter()
.for_each(|(key, deployment_slot, usage_counter)| {
cache.replenish(
*key,
new_test_loaded_program_with_usage(
*deployment_slot,
(*deployment_slot) + 2,
AtomicU64::new(*usage_counter),
),
);
num_total_programs += 1;
programs
.entry(*key)
.and_modify(|entries| entries.push((*deployment_slot, *usage_counter)))
.or_insert_with(|| {
Vec::<(u64, u64)>::from([(*deployment_slot, *usage_counter)])
});
});
eviction_candidates.sort_by_key(|(_key, _deplyment_slot, usage_counter)| *usage_counter);
// Try to remove no programs.
cache.sort_and_evict(Some(num_total_programs));
// Check that every program is still in the cache.
programs.iter().for_each(|entry| {
assert!(cache.entries.get(entry.0).is_some());
});
// Try to remove less than max programs.
let max_cache_entries = 12_usize;
// Guarantee you won't evict all eviction candidates
let num_to_remove = num_total_programs - max_cache_entries;
assert!(eviction_candidates.len() > num_to_remove);
let removals = eviction_candidates
.drain(0..num_to_remove)
.map(|(key, _, _)| key)
.collect::<Vec<_>>();
cache.sort_and_evict(Some(max_cache_entries));
// Make sure removed entries are gone
removals.iter().for_each(|key| {
assert!(cache.entries.get(key).is_none());
});
// Make sure the other entries are still present in the cache
programs
.iter()
.filter(|(key, _)| !removals.contains(key))
.for_each(
// For every entry not removed
|(key, val)| {
let program_in_cache = cache.entries.get(key);
assert!(program_in_cache.is_some()); // Make sure it's entry exists
let values_in_cache = program_in_cache
.unwrap()
.iter()
.map(|x| (x.deployment_slot, x.usage_counter.load(Ordering::Relaxed)))
.collect::<Vec<_>>();
val.iter().for_each(|entry| {
// make sure the exact slot and usage counter remain
// for the entry
assert!(values_in_cache.contains(entry));
});
},
);
// Remove entries from you local cache tracker
removals.iter().for_each(|key| {
programs.remove(key);
num_total_programs -= 1;
});
// Try to remove all programs.
let max_num_removals = eviction_candidates.len();
// Make sure total programs is greater than number of eviction candidates
assert!(num_total_programs > max_num_removals);
cache.sort_and_evict(Some(0));
// Make sure all candidate removals were removed
let removals = eviction_candidates
.iter()
.map(|(key, _, _)| key)
.collect::<Vec<_>>();
removals.iter().for_each(|key| {
assert!(cache.entries.get(*key).is_none());
});
// Make sure all non-candidate removals remain
programs
.iter()
.filter(|(key, _)| !removals.contains(key))
.for_each(
// For every entry not removed
|(key, val)| {
let program_in_cache = cache.entries.get(key);
assert!(program_in_cache.is_some()); // Make sure it's entry exists
let values_in_cache = program_in_cache
.unwrap()
.iter()
.map(|x| (x.deployment_slot, x.usage_counter.load(Ordering::Relaxed)))
.collect::<Vec<_>>();
val.iter().for_each(|entry| {
// make sure the exact slot and usage counter remain
// for the entry
assert!(values_in_cache.contains(entry));
});
},
);
}
#[test]
fn test_tombstone() {
let tombstone = LoadedProgram::new_tombstone(0);
@ -550,12 +823,19 @@ mod tests {
}
fn new_test_loaded_program(deployment_slot: Slot, effective_slot: Slot) -> Arc<LoadedProgram> {
new_test_loaded_program_with_usage(deployment_slot, effective_slot, AtomicU64::default())
}
fn new_test_loaded_program_with_usage(
deployment_slot: Slot,
effective_slot: Slot,
usage_counter: AtomicU64,
) -> Arc<LoadedProgram> {
Arc::new(LoadedProgram {
program: LoadedProgramType::Invalid,
account_size: 0,
deployment_slot,
effective_slot,
usage_counter: AtomicU64::default(),
usage_counter,
})
}