From 716ad5441bb35151c934c5f131608b6767ff6e3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Mei=C3=9Fner?= Date: Wed, 14 Feb 2024 12:13:53 +0100 Subject: [PATCH] Refactor - `LoadedPrograms::replenish()` (#35145) * Replace LoadedPrograms::replenish() with LoadedPrograms::assign_program(). * Removes LoadedPrograms::replenish(). * Defines replacement by having the same loaded program type. * Implements a proper insertion sort with a two key comparison operator. --- program-runtime/src/loaded_programs.rs | 238 +++++++++++++------------ runtime/src/bank.rs | 4 +- 2 files changed, 123 insertions(+), 119 deletions(-) diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 19f5f7486..a760660b0 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -696,24 +696,20 @@ impl LoadedPrograms { &self.environments } - /// Refill the cache with a single entry. It's typically called during transaction loading, + /// Insert a single entry. It's typically called during transaction loading, /// when the cache doesn't contain the entry corresponding to program `key`. - /// The function dedupes the cache, in case some other thread replenished the entry in parallel. - pub fn replenish( - &mut self, - key: Pubkey, - entry: Arc, - ) -> (bool, Arc) { + pub fn assign_program(&mut self, key: Pubkey, entry: Arc) -> bool { let slot_versions = &mut self.entries.entry(key).or_default().slot_versions; - let index = slot_versions - .iter() - .position(|at| at.effective_slot >= entry.effective_slot); - if let Some(existing) = index.and_then(|index| slot_versions.get_mut(index)) { - if existing.deployment_slot == entry.deployment_slot - && existing.effective_slot == entry.effective_slot - { - if matches!(existing.program, LoadedProgramType::Unloaded(_)) { - // The unloaded program is getting reloaded + match slot_versions.binary_search_by(|at| { + at.effective_slot + .cmp(&entry.effective_slot) + .then(at.deployment_slot.cmp(&entry.deployment_slot)) + }) { + Ok(index) => { + let existing = slot_versions.get_mut(index).unwrap(); + if std::mem::discriminant(&existing.program) + != std::mem::discriminant(&entry.program) + { // Copy over the usage counter to the new entry entry.tx_usage_counter.fetch_add( existing.tx_usage_counter.load(Ordering::Relaxed), @@ -723,34 +719,21 @@ impl LoadedPrograms { existing.ix_usage_counter.load(Ordering::Relaxed), Ordering::Relaxed, ); + *existing = entry.clone(); self.stats.reloads.fetch_add(1, Ordering::Relaxed); - } else if existing.is_tombstone() != entry.is_tombstone() { - // Either the old entry is tombstone and the new one is not. - // (Let's give the new entry a chance). - // Or, the old entry is not a tombstone and the new one is a tombstone. - // (Remove the old entry, as the tombstone makes it obsolete). - self.stats.insertions.fetch_add(1, Ordering::Relaxed); + false } else { + // Something is wrong, I can feel it ... self.stats.replacements.fetch_add(1, Ordering::Relaxed); - return (true, existing.clone()); + true } - *existing = entry.clone(); - return (false, entry); + } + Err(index) => { + self.stats.insertions.fetch_add(1, Ordering::Relaxed); + slot_versions.insert(index, entry.clone()); + false } } - self.stats.insertions.fetch_add(1, Ordering::Relaxed); - slot_versions.insert(index.unwrap_or(slot_versions.len()), entry.clone()); - (false, entry) - } - - /// Assign the program `entry` to the given `key` in the cache. - /// This is typically called when a deployed program is managed (un-/re-/deployed) via - /// loader instructions. Because of the cooldown, entires can not have the same - /// deployment_slot and effective_slot. - pub fn assign_program(&mut self, key: Pubkey, entry: Arc) -> Arc { - let (was_occupied, entry) = self.replenish(key, entry); - debug_assert!(!was_occupied); - entry } pub fn prune_by_deployment_slot(&mut self, slot: Slot) { @@ -986,7 +969,7 @@ impl LoadedPrograms { pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) { tx_batch_cache.entries.iter().for_each(|(key, entry)| { - self.replenish(*key, entry.clone()); + self.assign_program(*key, entry.clone()); }) } @@ -1233,7 +1216,9 @@ mod tests { slot: Slot, reason: LoadedProgramType, ) -> Arc { - cache.assign_program(key, Arc::new(LoadedProgram::new_tombstone(slot, reason))) + let program = Arc::new(LoadedProgram::new_tombstone(slot, reason)); + cache.assign_program(key, program.clone()); + program } fn insert_unloaded_program( @@ -1256,7 +1241,8 @@ mod tests { .to_unloaded() .expect("Failed to unload the program"), ); - cache.replenish(key, unloaded).1 + cache.assign_program(key, unloaded.clone()); + unloaded } fn num_matching_entries(cache: &LoadedPrograms, predicate: P) -> usize @@ -1323,7 +1309,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program1_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program1, new_test_loaded_program_with_usage( *deployment_slot, @@ -1356,7 +1342,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program2_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program2, new_test_loaded_program_with_usage( *deployment_slot, @@ -1388,7 +1374,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program3_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program3, new_test_loaded_program_with_usage( *deployment_slot, @@ -1470,7 +1456,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program1_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program1, new_test_loaded_program_with_usage( *deployment_slot, @@ -1503,7 +1489,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program2_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program2, new_test_loaded_program_with_usage( *deployment_slot, @@ -1535,7 +1521,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program3_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program3, new_test_loaded_program_with_usage( *deployment_slot, @@ -1628,7 +1614,7 @@ mod tests { let program = Pubkey::new_unique(); let num_total_programs = 6; (0..num_total_programs).for_each(|i| { - cache.replenish( + cache.assign_program( program, new_test_loaded_program_with_usage(i, i + 2, AtomicU64::new(i + 10)), ); @@ -1655,7 +1641,7 @@ mod tests { // Replenish the program that was just unloaded. Use 0 as the usage counter. This should be // updated with the usage counter from the unloaded program. - cache.replenish( + cache.assign_program( program, new_test_loaded_program_with_usage(0, 2, AtomicU64::new(0)), ); @@ -1674,21 +1660,63 @@ mod tests { } #[test] - fn test_replace_tombstones() { + fn test_fuzz_assign_program_order() { + use rand::prelude::SliceRandom; + const EXPECTED_ENTRIES: [(u64, u64); 7] = + [(1, 2), (5, 5), (5, 6), (5, 10), (9, 10), (10, 10), (3, 12)]; + let mut rng = rand::thread_rng(); + let program_id = Pubkey::new_unique(); + for _ in 0..1000 { + let mut entries = EXPECTED_ENTRIES.to_vec(); + entries.shuffle(&mut rng); + let mut cache = new_mock_cache::(); + for (deployment_slot, effective_slot) in entries { + assert!(!cache.assign_program( + program_id, + new_test_loaded_program(deployment_slot, effective_slot) + )); + } + for ((deployment_slot, effective_slot), entry) in EXPECTED_ENTRIES + .iter() + .zip(cache.entries.get(&program_id).unwrap().slot_versions.iter()) + { + assert_eq!(entry.deployment_slot, *deployment_slot); + assert_eq!(entry.effective_slot, *effective_slot); + } + } + } + + #[test] + fn test_assign_program_tombstones() { let mut cache = new_mock_cache::(); let program1 = Pubkey::new_unique(); - let env = Arc::new(BuiltinProgram::new_mock()); + let env = cache.environments.program_runtime_v1.clone(); + set_tombstone( &mut cache, program1, 10, - LoadedProgramType::FailedVerification(env), + LoadedProgramType::FailedVerification(env.clone()), ); + assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1); + set_tombstone(&mut cache, program1, 10, LoadedProgramType::Closed); + assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1); + set_tombstone( + &mut cache, + program1, + 10, + LoadedProgramType::FailedVerification(env.clone()), + ); + assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1); - let loaded_program = new_test_loaded_program(10, 10); - let (existing, program) = cache.replenish(program1, loaded_program.clone()); - assert!(!existing); - assert_eq!(program, loaded_program); + // Fail on exact replacement + assert!(cache.assign_program( + program1, + Arc::new(LoadedProgram::new_tombstone( + 10, + LoadedProgramType::FailedVerification(env) + )) + )); } #[test] @@ -1726,11 +1754,7 @@ mod tests { // Add a program at slot 50, and a tombstone for the program at slot 60 let program2 = Pubkey::new_unique(); - assert!( - !cache - .replenish(program2, new_test_builtin_program(50, 51)) - .0 - ); + cache.assign_program(program2, new_test_builtin_program(50, 51)); let second_level = &cache .entries .get(&program2) @@ -1830,10 +1854,7 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - let loaded_program = new_test_loaded_program(10, 10); - let (existing, program) = cache.replenish(program1, loaded_program.clone()); - assert!(!existing); - assert_eq!(program, loaded_program); + cache.assign_program(program1, new_test_loaded_program(10, 10)); let new_env = Arc::new(BuiltinProgram::new_mock()); cache.upcoming_environments = Some(ProgramRuntimeEnvironments { @@ -1849,9 +1870,7 @@ mod tests { ix_usage_counter: AtomicU64::default(), latest_access_slot: AtomicU64::default(), }); - let (existing, program) = cache.replenish(program1, updated_program.clone()); - assert!(!existing); - assert_eq!(program, updated_program); + cache.assign_program(program1, updated_program.clone()); // Test that there are 2 entries for the program assert_eq!( @@ -1986,38 +2005,27 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(10, 11)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0); - - // Test: inserting duplicate entry return pre existing entry from the cache - assert!(cache.replenish(program1, new_test_loaded_program(20, 21)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(10, 11)); + cache.assign_program(program1, new_test_loaded_program(20, 21)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0); - assert!( - !cache - .replenish( - program2, - new_test_loaded_program(11, 11 + DELAY_VISIBILITY_SLOT_OFFSET) - ) - .0 + cache.assign_program(program2, new_test_loaded_program(5, 6)); + cache.assign_program( + program2, + new_test_loaded_program(11, 11 + DELAY_VISIBILITY_SLOT_OFFSET), ); let program3 = Pubkey::new_unique(); - assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0); + cache.assign_program(program3, new_test_loaded_program(25, 26)); let program4 = Pubkey::new_unique(); - assert!(!cache.replenish(program4, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program4, new_test_loaded_program(5, 6)).0); + cache.assign_program(program4, new_test_loaded_program(0, 1)); + cache.assign_program(program4, new_test_loaded_program(5, 6)); // The following is a special case, where effective slot is 3 slots in the future - assert!( - !cache - .replenish( - program4, - new_test_loaded_program(15, 15 + DELAY_VISIBILITY_SLOT_OFFSET) - ) - .0 + cache.assign_program( + program4, + new_test_loaded_program(15, 15 + DELAY_VISIBILITY_SLOT_OFFSET), ); // Current fork graph @@ -2243,15 +2251,15 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(20, 21)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0); - assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0); + cache.assign_program(program2, new_test_loaded_program(5, 6)); + cache.assign_program(program2, new_test_loaded_program(11, 12)); let program3 = Pubkey::new_unique(); - assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0); + cache.assign_program(program3, new_test_loaded_program(25, 26)); // Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19 let mut missing = vec![ @@ -2316,12 +2324,12 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(20, 21)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0); - assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0); + cache.assign_program(program2, new_test_loaded_program(5, 6)); + cache.assign_program(program2, new_test_loaded_program(11, 12)); let program3 = Pubkey::new_unique(); // Insert an unloaded program with correct/cache's environment at slot 25 @@ -2330,17 +2338,13 @@ mod tests { // Insert another unloaded program with a different environment at slot 20 // Since this entry's environment won't match cache's environment, looking up this // entry should return missing instead of unloaded entry. - assert!( - !cache - .replenish( - program3, - Arc::new( - new_test_loaded_program(20, 21) - .to_unloaded() - .expect("Failed to create unloaded program") - ) - ) - .0 + cache.assign_program( + program3, + Arc::new( + new_test_loaded_program(20, 21) + .to_unloaded() + .expect("Failed to create unloaded program"), + ), ); // Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19 @@ -2407,8 +2411,8 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(5, 6)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(5, 6)); cache.prune(10, 0); @@ -2447,11 +2451,11 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(5, 6)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(5, 6)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(10, 11)).0); + cache.assign_program(program2, new_test_loaded_program(10, 11)); let mut missing = vec![ (program1, (LoadedProgramMatchCriteria::NoCriteria, 1)), diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index f2722983d..1a9f1d8bb 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1363,7 +1363,7 @@ impl Bank { drop(loaded_programs_cache); let recompiled = new.load_program(&key, false, Some(program_to_recompile)); let mut loaded_programs_cache = new.loaded_programs_cache.write().unwrap(); - loaded_programs_cache.replenish(key, recompiled); + loaded_programs_cache.assign_program(key, recompiled); } } else if new.epoch() != loaded_programs_cache.latest_root_epoch || slot_index.saturating_add(slots_in_recompilation_phase) >= slots_in_epoch @@ -7056,7 +7056,7 @@ impl Bank { self.loaded_programs_cache .write() .unwrap() - .replenish(program_id, Arc::new(builtin)); + .assign_program(program_id, Arc::new(builtin)); debug!("Added program {} under {:?}", name, program_id); }