Refactor - `LoadedPrograms::replenish()` (#35145)

* Replace LoadedPrograms::replenish() with LoadedPrograms::assign_program().

* Removes LoadedPrograms::replenish().

* Defines replacement by having the same loaded program type.

* Implements a proper insertion sort with a two key comparison operator.
This commit is contained in:
Alexander Meißner 2024-02-14 12:13:53 +01:00 committed by GitHub
parent 2d09e4965e
commit 716ad5441b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 123 additions and 119 deletions

View File

@ -696,24 +696,20 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
&self.environments
}
/// Refill the cache with a single entry. It's typically called during transaction loading,
/// Insert a single entry. It's typically called during transaction loading,
/// when the cache doesn't contain the entry corresponding to program `key`.
/// The function dedupes the cache, in case some other thread replenished the entry in parallel.
pub fn replenish(
&mut self,
key: Pubkey,
entry: Arc<LoadedProgram>,
) -> (bool, Arc<LoadedProgram>) {
pub fn assign_program(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> bool {
let slot_versions = &mut self.entries.entry(key).or_default().slot_versions;
let index = slot_versions
.iter()
.position(|at| at.effective_slot >= entry.effective_slot);
if let Some(existing) = index.and_then(|index| slot_versions.get_mut(index)) {
if existing.deployment_slot == entry.deployment_slot
&& existing.effective_slot == entry.effective_slot
{
if matches!(existing.program, LoadedProgramType::Unloaded(_)) {
// The unloaded program is getting reloaded
match slot_versions.binary_search_by(|at| {
at.effective_slot
.cmp(&entry.effective_slot)
.then(at.deployment_slot.cmp(&entry.deployment_slot))
}) {
Ok(index) => {
let existing = slot_versions.get_mut(index).unwrap();
if std::mem::discriminant(&existing.program)
!= std::mem::discriminant(&entry.program)
{
// Copy over the usage counter to the new entry
entry.tx_usage_counter.fetch_add(
existing.tx_usage_counter.load(Ordering::Relaxed),
@ -723,34 +719,21 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
existing.ix_usage_counter.load(Ordering::Relaxed),
Ordering::Relaxed,
);
*existing = entry.clone();
self.stats.reloads.fetch_add(1, Ordering::Relaxed);
} else if existing.is_tombstone() != entry.is_tombstone() {
// Either the old entry is tombstone and the new one is not.
// (Let's give the new entry a chance).
// Or, the old entry is not a tombstone and the new one is a tombstone.
// (Remove the old entry, as the tombstone makes it obsolete).
self.stats.insertions.fetch_add(1, Ordering::Relaxed);
false
} else {
// Something is wrong, I can feel it ...
self.stats.replacements.fetch_add(1, Ordering::Relaxed);
return (true, existing.clone());
true
}
*existing = entry.clone();
return (false, entry);
}
Err(index) => {
self.stats.insertions.fetch_add(1, Ordering::Relaxed);
slot_versions.insert(index, entry.clone());
false
}
}
self.stats.insertions.fetch_add(1, Ordering::Relaxed);
slot_versions.insert(index.unwrap_or(slot_versions.len()), entry.clone());
(false, entry)
}
/// Assign the program `entry` to the given `key` in the cache.
/// This is typically called when a deployed program is managed (un-/re-/deployed) via
/// loader instructions. Because of the cooldown, entires can not have the same
/// deployment_slot and effective_slot.
pub fn assign_program(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> Arc<LoadedProgram> {
let (was_occupied, entry) = self.replenish(key, entry);
debug_assert!(!was_occupied);
entry
}
pub fn prune_by_deployment_slot(&mut self, slot: Slot) {
@ -986,7 +969,7 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
tx_batch_cache.entries.iter().for_each(|(key, entry)| {
self.replenish(*key, entry.clone());
self.assign_program(*key, entry.clone());
})
}
@ -1233,7 +1216,9 @@ mod tests {
slot: Slot,
reason: LoadedProgramType,
) -> Arc<LoadedProgram> {
cache.assign_program(key, Arc::new(LoadedProgram::new_tombstone(slot, reason)))
let program = Arc::new(LoadedProgram::new_tombstone(slot, reason));
cache.assign_program(key, program.clone());
program
}
fn insert_unloaded_program<FG: ForkGraph>(
@ -1256,7 +1241,8 @@ mod tests {
.to_unloaded()
.expect("Failed to unload the program"),
);
cache.replenish(key, unloaded).1
cache.assign_program(key, unloaded.clone());
unloaded
}
fn num_matching_entries<P, FG>(cache: &LoadedPrograms<FG>, predicate: P) -> usize
@ -1323,7 +1309,7 @@ mod tests {
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program1_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
cache.assign_program(
program1,
new_test_loaded_program_with_usage(
*deployment_slot,
@ -1356,7 +1342,7 @@ mod tests {
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program2_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
cache.assign_program(
program2,
new_test_loaded_program_with_usage(
*deployment_slot,
@ -1388,7 +1374,7 @@ mod tests {
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program3_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
cache.assign_program(
program3,
new_test_loaded_program_with_usage(
*deployment_slot,
@ -1470,7 +1456,7 @@ mod tests {
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program1_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
cache.assign_program(
program1,
new_test_loaded_program_with_usage(
*deployment_slot,
@ -1503,7 +1489,7 @@ mod tests {
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program2_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
cache.assign_program(
program2,
new_test_loaded_program_with_usage(
*deployment_slot,
@ -1535,7 +1521,7 @@ mod tests {
.enumerate()
.for_each(|(i, deployment_slot)| {
let usage_counter = *program3_usage_counters.get(i).unwrap_or(&0);
cache.replenish(
cache.assign_program(
program3,
new_test_loaded_program_with_usage(
*deployment_slot,
@ -1628,7 +1614,7 @@ mod tests {
let program = Pubkey::new_unique();
let num_total_programs = 6;
(0..num_total_programs).for_each(|i| {
cache.replenish(
cache.assign_program(
program,
new_test_loaded_program_with_usage(i, i + 2, AtomicU64::new(i + 10)),
);
@ -1655,7 +1641,7 @@ mod tests {
// Replenish the program that was just unloaded. Use 0 as the usage counter. This should be
// updated with the usage counter from the unloaded program.
cache.replenish(
cache.assign_program(
program,
new_test_loaded_program_with_usage(0, 2, AtomicU64::new(0)),
);
@ -1674,21 +1660,63 @@ mod tests {
}
#[test]
fn test_replace_tombstones() {
fn test_fuzz_assign_program_order() {
use rand::prelude::SliceRandom;
const EXPECTED_ENTRIES: [(u64, u64); 7] =
[(1, 2), (5, 5), (5, 6), (5, 10), (9, 10), (10, 10), (3, 12)];
let mut rng = rand::thread_rng();
let program_id = Pubkey::new_unique();
for _ in 0..1000 {
let mut entries = EXPECTED_ENTRIES.to_vec();
entries.shuffle(&mut rng);
let mut cache = new_mock_cache::<TestForkGraph>();
for (deployment_slot, effective_slot) in entries {
assert!(!cache.assign_program(
program_id,
new_test_loaded_program(deployment_slot, effective_slot)
));
}
for ((deployment_slot, effective_slot), entry) in EXPECTED_ENTRIES
.iter()
.zip(cache.entries.get(&program_id).unwrap().slot_versions.iter())
{
assert_eq!(entry.deployment_slot, *deployment_slot);
assert_eq!(entry.effective_slot, *effective_slot);
}
}
}
#[test]
fn test_assign_program_tombstones() {
let mut cache = new_mock_cache::<TestForkGraph>();
let program1 = Pubkey::new_unique();
let env = Arc::new(BuiltinProgram::new_mock());
let env = cache.environments.program_runtime_v1.clone();
set_tombstone(
&mut cache,
program1,
10,
LoadedProgramType::FailedVerification(env),
LoadedProgramType::FailedVerification(env.clone()),
);
assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1);
set_tombstone(&mut cache, program1, 10, LoadedProgramType::Closed);
assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1);
set_tombstone(
&mut cache,
program1,
10,
LoadedProgramType::FailedVerification(env.clone()),
);
assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1);
let loaded_program = new_test_loaded_program(10, 10);
let (existing, program) = cache.replenish(program1, loaded_program.clone());
assert!(!existing);
assert_eq!(program, loaded_program);
// Fail on exact replacement
assert!(cache.assign_program(
program1,
Arc::new(LoadedProgram::new_tombstone(
10,
LoadedProgramType::FailedVerification(env)
))
));
}
#[test]
@ -1726,11 +1754,7 @@ mod tests {
// Add a program at slot 50, and a tombstone for the program at slot 60
let program2 = Pubkey::new_unique();
assert!(
!cache
.replenish(program2, new_test_builtin_program(50, 51))
.0
);
cache.assign_program(program2, new_test_builtin_program(50, 51));
let second_level = &cache
.entries
.get(&program2)
@ -1830,10 +1854,7 @@ mod tests {
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
let loaded_program = new_test_loaded_program(10, 10);
let (existing, program) = cache.replenish(program1, loaded_program.clone());
assert!(!existing);
assert_eq!(program, loaded_program);
cache.assign_program(program1, new_test_loaded_program(10, 10));
let new_env = Arc::new(BuiltinProgram::new_mock());
cache.upcoming_environments = Some(ProgramRuntimeEnvironments {
@ -1849,9 +1870,7 @@ mod tests {
ix_usage_counter: AtomicU64::default(),
latest_access_slot: AtomicU64::default(),
});
let (existing, program) = cache.replenish(program1, updated_program.clone());
assert!(!existing);
assert_eq!(program, updated_program);
cache.assign_program(program1, updated_program.clone());
// Test that there are 2 entries for the program
assert_eq!(
@ -1986,38 +2005,27 @@ mod tests {
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(10, 11)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0);
// Test: inserting duplicate entry return pre existing entry from the cache
assert!(cache.replenish(program1, new_test_loaded_program(20, 21)).0);
cache.assign_program(program1, new_test_loaded_program(0, 1));
cache.assign_program(program1, new_test_loaded_program(10, 11));
cache.assign_program(program1, new_test_loaded_program(20, 21));
let program2 = Pubkey::new_unique();
assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0);
assert!(
!cache
.replenish(
program2,
new_test_loaded_program(11, 11 + DELAY_VISIBILITY_SLOT_OFFSET)
)
.0
cache.assign_program(program2, new_test_loaded_program(5, 6));
cache.assign_program(
program2,
new_test_loaded_program(11, 11 + DELAY_VISIBILITY_SLOT_OFFSET),
);
let program3 = Pubkey::new_unique();
assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0);
cache.assign_program(program3, new_test_loaded_program(25, 26));
let program4 = Pubkey::new_unique();
assert!(!cache.replenish(program4, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program4, new_test_loaded_program(5, 6)).0);
cache.assign_program(program4, new_test_loaded_program(0, 1));
cache.assign_program(program4, new_test_loaded_program(5, 6));
// The following is a special case, where effective slot is 3 slots in the future
assert!(
!cache
.replenish(
program4,
new_test_loaded_program(15, 15 + DELAY_VISIBILITY_SLOT_OFFSET)
)
.0
cache.assign_program(
program4,
new_test_loaded_program(15, 15 + DELAY_VISIBILITY_SLOT_OFFSET),
);
// Current fork graph
@ -2243,15 +2251,15 @@ mod tests {
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0);
cache.assign_program(program1, new_test_loaded_program(0, 1));
cache.assign_program(program1, new_test_loaded_program(20, 21));
let program2 = Pubkey::new_unique();
assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0);
assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0);
cache.assign_program(program2, new_test_loaded_program(5, 6));
cache.assign_program(program2, new_test_loaded_program(11, 12));
let program3 = Pubkey::new_unique();
assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0);
cache.assign_program(program3, new_test_loaded_program(25, 26));
// Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19
let mut missing = vec![
@ -2316,12 +2324,12 @@ mod tests {
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0);
cache.assign_program(program1, new_test_loaded_program(0, 1));
cache.assign_program(program1, new_test_loaded_program(20, 21));
let program2 = Pubkey::new_unique();
assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0);
assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0);
cache.assign_program(program2, new_test_loaded_program(5, 6));
cache.assign_program(program2, new_test_loaded_program(11, 12));
let program3 = Pubkey::new_unique();
// Insert an unloaded program with correct/cache's environment at slot 25
@ -2330,17 +2338,13 @@ mod tests {
// Insert another unloaded program with a different environment at slot 20
// Since this entry's environment won't match cache's environment, looking up this
// entry should return missing instead of unloaded entry.
assert!(
!cache
.replenish(
program3,
Arc::new(
new_test_loaded_program(20, 21)
.to_unloaded()
.expect("Failed to create unloaded program")
)
)
.0
cache.assign_program(
program3,
Arc::new(
new_test_loaded_program(20, 21)
.to_unloaded()
.expect("Failed to create unloaded program"),
),
);
// Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19
@ -2407,8 +2411,8 @@ mod tests {
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(5, 6)).0);
cache.assign_program(program1, new_test_loaded_program(0, 1));
cache.assign_program(program1, new_test_loaded_program(5, 6));
cache.prune(10, 0);
@ -2447,11 +2451,11 @@ mod tests {
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(5, 6)).0);
cache.assign_program(program1, new_test_loaded_program(0, 1));
cache.assign_program(program1, new_test_loaded_program(5, 6));
let program2 = Pubkey::new_unique();
assert!(!cache.replenish(program2, new_test_loaded_program(10, 11)).0);
cache.assign_program(program2, new_test_loaded_program(10, 11));
let mut missing = vec![
(program1, (LoadedProgramMatchCriteria::NoCriteria, 1)),

View File

@ -1363,7 +1363,7 @@ impl Bank {
drop(loaded_programs_cache);
let recompiled = new.load_program(&key, false, Some(program_to_recompile));
let mut loaded_programs_cache = new.loaded_programs_cache.write().unwrap();
loaded_programs_cache.replenish(key, recompiled);
loaded_programs_cache.assign_program(key, recompiled);
}
} else if new.epoch() != loaded_programs_cache.latest_root_epoch
|| slot_index.saturating_add(slots_in_recompilation_phase) >= slots_in_epoch
@ -7056,7 +7056,7 @@ impl Bank {
self.loaded_programs_cache
.write()
.unwrap()
.replenish(program_id, Arc::new(builtin));
.assign_program(program_id, Arc::new(builtin));
debug!("Added program {} under {:?}", name, program_id);
}