Improve cache eviction policy for LoadedPrograms (#34391)

* Use 2's random selection to evict program cache

* implement decaying of usage counter

* replace RwLock with AtomicU64

* address review comments

* remove -> swap_remove
This commit is contained in:
Pankaj Garg 2023-12-18 14:51:36 -08:00 committed by GitHub
parent eb948b1ddc
commit 6f0133bd43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 282 additions and 33 deletions

View File

@ -3,9 +3,9 @@ use {
invoke_context::{BuiltinFunctionWithContext, InvokeContext},
timings::ExecuteDetailsTimings,
},
itertools::Itertools,
log::{debug, error, log_enabled, trace},
percentage::PercentageInteger,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_rbpf::{
elf::Executable,
@ -129,6 +129,8 @@ pub struct LoadedProgram {
pub tx_usage_counter: AtomicU64,
/// How often this entry was used by an instruction
pub ix_usage_counter: AtomicU64,
/// Latest slot in which the entry was used
pub latest_access_slot: AtomicU64,
}
#[derive(Debug, Default)]
@ -348,6 +350,7 @@ impl LoadedProgram {
tx_usage_counter: AtomicU64::new(0),
program,
ix_usage_counter: AtomicU64::new(0),
latest_access_slot: AtomicU64::new(0),
})
}
@ -360,6 +363,7 @@ impl LoadedProgram {
maybe_expiration_slot: self.maybe_expiration_slot,
tx_usage_counter: AtomicU64::new(self.tx_usage_counter.load(Ordering::Relaxed)),
ix_usage_counter: AtomicU64::new(self.ix_usage_counter.load(Ordering::Relaxed)),
latest_access_slot: AtomicU64::new(self.latest_access_slot.load(Ordering::Relaxed)),
})
}
@ -381,6 +385,7 @@ impl LoadedProgram {
tx_usage_counter: AtomicU64::new(0),
program: LoadedProgramType::Builtin(BuiltinProgram::new_builtin(function_registry)),
ix_usage_counter: AtomicU64::new(0),
latest_access_slot: AtomicU64::new(0),
}
}
@ -395,6 +400,7 @@ impl LoadedProgram {
maybe_expiration_slot,
tx_usage_counter: AtomicU64::default(),
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::new(0),
};
debug_assert!(tombstone.is_tombstone());
tombstone
@ -416,6 +422,16 @@ impl LoadedProgram {
&& slot >= self.deployment_slot
&& slot < self.effective_slot
}
pub fn update_access_slot(&self, slot: Slot) {
let _ = self.latest_access_slot.fetch_max(slot, Ordering::Relaxed);
}
pub fn decayed_usage_counter(&self, now: Slot) -> u64 {
let last_access = self.latest_access_slot.load(Ordering::Relaxed);
let decaying_for = now.saturating_sub(last_access);
self.tx_usage_counter.load(Ordering::Relaxed) >> decaying_for
}
}
#[derive(Clone, Debug)]
@ -862,7 +878,6 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
if let LoadedProgramType::Unloaded(_environment) = &entry.program {
break;
}
entry.clone()
} else if entry.is_implicit_delay_visibility_tombstone(
loaded_programs_for_tx_batch.slot,
@ -877,6 +892,7 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
} else {
continue;
};
entry_to_return.update_access_slot(loaded_programs_for_tx_batch.slot);
entry_to_return
.tx_usage_counter
.fetch_add(*usage_count, Ordering::Relaxed);
@ -935,10 +951,8 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
})
}
/// Returns the list of loaded programs which are verified and compiled sorted by `tx_usage_counter`.
///
/// Entries from program runtime v1 and v2 can be individually filtered.
pub fn get_entries_sorted_by_tx_usage(
/// Returns the list of loaded programs which are verified and compiled.
pub fn get_flattened_entries(
&self,
include_program_runtime_v1: bool,
include_program_runtime_v2: bool,
@ -963,19 +977,54 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
_ => None,
})
})
.sorted_by_cached_key(|(_id, program)| program.tx_usage_counter.load(Ordering::Relaxed))
.collect()
}
/// Unloads programs which were used infrequently
pub fn sort_and_unload(&mut self, shrink_to: PercentageInteger) {
let sorted_candidates = self.get_entries_sorted_by_tx_usage(true, true);
let mut sorted_candidates = self.get_flattened_entries(true, true);
sorted_candidates
.sort_by_cached_key(|(_id, program)| program.tx_usage_counter.load(Ordering::Relaxed));
let num_to_unload = sorted_candidates
.len()
.saturating_sub(shrink_to.apply_to(MAX_LOADED_ENTRY_COUNT));
self.unload_program_entries(sorted_candidates.iter().take(num_to_unload));
}
/// Evicts programs using 2's random selection, choosing the least used program out of the two entries.
/// The eviction is performed enough number of times to reduce the cache usage to the given percentage.
pub fn evict_using_2s_random_selection(&mut self, shrink_to: PercentageInteger, now: Slot) {
let mut candidates = self.get_flattened_entries(true, true);
let num_to_unload = candidates
.len()
.saturating_sub(shrink_to.apply_to(MAX_LOADED_ENTRY_COUNT));
fn random_index_and_usage_counter(
candidates: &[(Pubkey, Arc<LoadedProgram>)],
now: Slot,
) -> (usize, u64) {
let mut rng = thread_rng();
let index = rng.gen_range(0..candidates.len());
let usage_counter = candidates
.get(index)
.expect("Failed to get cached entry")
.1
.decayed_usage_counter(now);
(index, usage_counter)
}
for _ in 0..num_to_unload {
let (index1, usage_counter1) = random_index_and_usage_counter(&candidates, now);
let (index2, usage_counter2) = random_index_and_usage_counter(&candidates, now);
let (program, entry) = if usage_counter1 < usage_counter2 {
candidates.swap_remove(index1)
} else {
candidates.swap_remove(index2)
};
self.unload_program_entry(&program, &entry);
}
}
/// Removes all the entries at the given keys, if they exist
pub fn remove_programs(&mut self, keys: impl Iterator<Item = Pubkey>) {
for k in keys {
@ -993,6 +1042,11 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
.entry(*id)
.and_modify(|c| saturating_add_assign!(*c, 1))
.or_insert(1);
} else {
error!(
"Failed to create an unloaded cache entry for a program type {:?}",
entry.program
);
}
}
}
@ -1003,30 +1057,38 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
keys.iter().for_each(|key| self.unload_program(key));
}
/// This function removes the given entry for the given program from the cache.
/// The function expects that the program and entry exists in the cache. Otherwise it'll panic.
fn unload_program_entry(&mut self, program: &Pubkey, remove_entry: &Arc<LoadedProgram>) {
let second_level = self.entries.get_mut(program).expect("Cache lookup failed");
let candidate = second_level
.slot_versions
.iter_mut()
.find(|entry| entry == &remove_entry)
.expect("Program entry not found");
// Certain entry types cannot be unloaded, such as tombstones, or already unloaded entries.
// For such entries, `to_unloaded()` will return None.
// These entry types do not occupy much memory.
if let Some(unloaded) = candidate.to_unloaded() {
if candidate.tx_usage_counter.load(Ordering::Relaxed) == 1 {
self.stats.one_hit_wonders.fetch_add(1, Ordering::Relaxed);
}
self.stats
.evictions
.entry(*program)
.and_modify(|c| saturating_add_assign!(*c, 1))
.or_insert(1);
*candidate = Arc::new(unloaded);
}
}
fn unload_program_entries<'a>(
&mut self,
remove: impl Iterator<Item = &'a (Pubkey, Arc<LoadedProgram>)>,
) {
for (id, program) in remove {
if let Some(second_level) = self.entries.get_mut(id) {
if let Some(candidate) = second_level
.slot_versions
.iter_mut()
.find(|entry| entry == &program)
{
if let Some(unloaded) = candidate.to_unloaded() {
if candidate.tx_usage_counter.load(Ordering::Relaxed) == 1 {
self.stats.one_hit_wonders.fetch_add(1, Ordering::Relaxed);
}
self.stats
.evictions
.entry(*id)
.and_modify(|c| saturating_add_assign!(*c, 1))
.or_insert(1);
*candidate = Arc::new(unloaded);
}
}
}
for (program, entry) in remove {
self.unload_program_entry(program, entry);
}
}
@ -1125,6 +1187,7 @@ mod tests {
maybe_expiration_slot: expiry,
tx_usage_counter: usage_counter,
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::new(deployment_slot),
})
}
@ -1137,6 +1200,7 @@ mod tests {
maybe_expiration_slot: None,
tx_usage_counter: AtomicU64::default(),
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::default(),
})
}
@ -1165,6 +1229,7 @@ mod tests {
maybe_expiration_slot: None,
tx_usage_counter: AtomicU64::default(),
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::default(),
}
.to_unloaded()
.expect("Failed to unload the program"),
@ -1190,6 +1255,181 @@ mod tests {
.sum()
}
#[test]
fn test_usage_counter_decay() {
let _cache = new_mock_cache::<TestForkGraph>();
let program = new_test_loaded_program_with_usage(10, 11, AtomicU64::new(32));
program.update_access_slot(15);
assert_eq!(program.decayed_usage_counter(15), 32);
assert_eq!(program.decayed_usage_counter(16), 16);
assert_eq!(program.decayed_usage_counter(17), 8);
assert_eq!(program.decayed_usage_counter(18), 4);
assert_eq!(program.decayed_usage_counter(19), 2);
assert_eq!(program.decayed_usage_counter(20), 1);
assert_eq!(program.decayed_usage_counter(21), 0);
assert_eq!(program.decayed_usage_counter(15), 32);
assert_eq!(program.decayed_usage_counter(14), 32);
program.update_access_slot(18);
assert_eq!(program.decayed_usage_counter(15), 32);
assert_eq!(program.decayed_usage_counter(16), 32);
assert_eq!(program.decayed_usage_counter(17), 32);
assert_eq!(program.decayed_usage_counter(18), 32);
assert_eq!(program.decayed_usage_counter(19), 16);
assert_eq!(program.decayed_usage_counter(20), 8);
assert_eq!(program.decayed_usage_counter(21), 4);
}
#[test]
fn test_random_eviction() {
let mut programs = vec![];
let mut cache = new_mock_cache::<TestForkGraph>();
// This test adds different kind of entries to the cache.
// Tombstones and unloaded entries are expected to not be evicted.
// It also adds multiple entries for three programs as it tries to create a typical cache instance.
let program1 = Pubkey::new_unique();
let program1_deployment_slots = [0, 10, 20];
let program1_usage_counters = [4, 5, 25];
program1_deployment_slots
.iter()
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program1_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
program1,
new_test_loaded_program_with_usage(
*deployment_slot,
(*deployment_slot) + 2,
AtomicU64::new(usage_counter),
),
);
programs.push((program1, *deployment_slot, usage_counter));
});
let env = Arc::new(BuiltinProgram::new_mock());
for slot in 21..31 {
set_tombstone(
&mut cache,
program1,
slot,
LoadedProgramType::FailedVerification(env.clone()),
);
}
for slot in 31..41 {
insert_unloaded_program(&mut cache, program1, slot);
}
let program2 = Pubkey::new_unique();
let program2_deployment_slots = [5, 11];
let program2_usage_counters = [0, 2];
program2_deployment_slots
.iter()
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program2_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
program2,
new_test_loaded_program_with_usage(
*deployment_slot,
(*deployment_slot) + 2,
AtomicU64::new(usage_counter),
),
);
programs.push((program2, *deployment_slot, usage_counter));
});
for slot in 21..31 {
set_tombstone(
&mut cache,
program2,
slot,
LoadedProgramType::DelayVisibility,
);
}
for slot in 31..41 {
insert_unloaded_program(&mut cache, program2, slot);
}
let program3 = Pubkey::new_unique();
let program3_deployment_slots = [0, 5, 15];
let program3_usage_counters = [100, 3, 20];
program3_deployment_slots
.iter()
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program3_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
program3,
new_test_loaded_program_with_usage(
*deployment_slot,
(*deployment_slot) + 2,
AtomicU64::new(usage_counter),
),
);
programs.push((program3, *deployment_slot, usage_counter));
});
for slot in 21..31 {
set_tombstone(&mut cache, program3, slot, LoadedProgramType::Closed);
}
for slot in 31..41 {
insert_unloaded_program(&mut cache, program3, slot);
}
programs.sort_by_key(|(_id, _slot, usage_count)| *usage_count);
let num_loaded = num_matching_entries(&cache, |program_type| {
matches!(program_type, LoadedProgramType::TestLoaded(_))
});
let num_unloaded = num_matching_entries(&cache, |program_type| {
matches!(program_type, LoadedProgramType::Unloaded(_))
});
let num_tombstones = num_matching_entries(&cache, |program_type| {
matches!(
program_type,
LoadedProgramType::DelayVisibility
| LoadedProgramType::FailedVerification(_)
| LoadedProgramType::Closed
)
});
// Test that the cache is constructed with the expected number of entries.
assert_eq!(num_loaded, 8);
assert_eq!(num_unloaded, 30);
assert_eq!(num_tombstones, 30);
// Evicting to 2% should update cache with
// * 5 active entries
// * 33 unloaded entries (3 active programs will get unloaded)
// * 30 tombstones (tombstones are not evicted)
cache.evict_using_2s_random_selection(Percentage::from(2), 21);
let num_loaded = num_matching_entries(&cache, |program_type| {
matches!(program_type, LoadedProgramType::TestLoaded(_))
});
let num_unloaded = num_matching_entries(&cache, |program_type| {
matches!(program_type, LoadedProgramType::Unloaded(_))
});
let num_tombstones = num_matching_entries(&cache, |program_type| {
matches!(
program_type,
LoadedProgramType::DelayVisibility
| LoadedProgramType::FailedVerification(_)
| LoadedProgramType::Closed
)
});
// Test that expected number of loaded entries get evicted/unloaded.
assert_eq!(num_loaded, 5);
assert_eq!(num_unloaded, 33);
assert_eq!(num_tombstones, 30);
}
#[test]
fn test_eviction() {
let mut programs = vec![];
@ -1582,6 +1822,7 @@ mod tests {
maybe_expiration_slot: None,
tx_usage_counter: AtomicU64::default(),
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::default(),
});
let (existing, program) = cache.replenish(program1, updated_program.clone());
assert!(!existing);
@ -1874,6 +2115,7 @@ mod tests {
maybe_expiration_slot: Some(21),
tx_usage_counter: AtomicU64::default(),
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::default(),
});
assert!(!cache.replenish(program4, test_program).0);
@ -2217,6 +2459,7 @@ mod tests {
maybe_expiration_slot: Some(15),
tx_usage_counter: AtomicU64::default(),
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::default(),
});
assert!(!cache.replenish(program1, test_program).0);

View File

@ -4021,6 +4021,7 @@ mod tests {
maybe_expiration_slot: None,
tx_usage_counter: AtomicU64::new(100),
ix_usage_counter: AtomicU64::new(100),
latest_access_slot: AtomicU64::new(0),
};
invoke_context
.programs_modified_by_tx
@ -4061,6 +4062,7 @@ mod tests {
maybe_expiration_slot: None,
tx_usage_counter: AtomicU64::new(100),
ix_usage_counter: AtomicU64::new(100),
latest_access_slot: AtomicU64::new(0),
};
invoke_context
.programs_modified_by_tx

View File

@ -1476,10 +1476,10 @@ impl Bank {
}
loaded_programs_cache.upcoming_environments = Some(upcoming_environments);
loaded_programs_cache.programs_to_recompile = loaded_programs_cache
.get_entries_sorted_by_tx_usage(
changed_program_runtime_v1,
changed_program_runtime_v2,
);
.get_flattened_entries(changed_program_runtime_v1, changed_program_runtime_v2);
loaded_programs_cache
.programs_to_recompile
.sort_by_cached_key(|(_id, program)| program.decayed_usage_counter(slot));
}
});
@ -4799,6 +4799,7 @@ impl Bank {
loaded_program.ix_usage_counter =
AtomicU64::new(recompile.ix_usage_counter.load(Ordering::Relaxed));
}
loaded_program.update_access_slot(self.slot());
Arc::new(loaded_program)
}
@ -5287,7 +5288,10 @@ impl Bank {
self.loaded_programs_cache
.write()
.unwrap()
.sort_and_unload(Percentage::from(SHRINK_LOADED_PROGRAMS_TO_PERCENTAGE));
.evict_using_2s_random_selection(
Percentage::from(SHRINK_LOADED_PROGRAMS_TO_PERCENTAGE),
self.slot(),
);
debug!(
"check: {}us load: {}us execute: {}us txs_len={}",