Cleanup - `load_and_get_programs_from_cache()` (#30959)

* Replaces assign_program() by replenish() in load_and_get_programs_from_cache().

* Replaces LoadedProgramEntry by a boolean.

* Only lock the global cache once all missing programs have been loaded

* Factors out call of self.rc.accounts.filter_executable_program_accounts().
This commit is contained in:
Alexander Meißner 2023-03-29 16:11:14 +02:00 committed by GitHub
parent 4bd7de4887
commit a6ad37f3ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 190 deletions

View File

@ -237,63 +237,37 @@ impl solana_frozen_abi::abi_example::AbiExample for LoadedPrograms {
}
}
pub enum LoadedProgramEntry {
WasOccupied(Arc<LoadedProgram>),
WasVacant(Arc<LoadedProgram>),
}
impl LoadedPrograms {
/// Refill the cache with a single entry. It's typically called during transaction processing,
/// Refill the cache with a single entry. It's typically called during transaction loading,
/// when the cache doesn't contain the entry corresponding to program `key`.
/// The function dedupes the cache, in case some other thread replenished the the entry in parallel.
pub fn replenish(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> LoadedProgramEntry {
/// The function dedupes the cache, in case some other thread replenished the entry in parallel.
pub fn replenish(
&mut self,
key: Pubkey,
entry: Arc<LoadedProgram>,
) -> (bool, Arc<LoadedProgram>) {
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
let index = second_level
.iter()
.position(|at| at.effective_slot >= entry.effective_slot);
if let Some(index) = index {
let existing = second_level
.get(index)
.expect("Missing entry, even though position was found");
if let Some(existing) = index.and_then(|index| second_level.get(index)) {
if existing.deployment_slot == entry.deployment_slot
&& existing.effective_slot == entry.effective_slot
{
return LoadedProgramEntry::WasOccupied(existing.clone());
return (true, existing.clone());
}
}
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
LoadedProgramEntry::WasVacant(entry)
(false, entry)
}
/// Assign the program `entry` to the given `key` in the cache.
/// This is typically called when a deployed program is managed (upgraded/un/reddeployed) via
/// bpf loader instructions.
/// The program management is not expected to overlap with initial program deployment slot.
/// Note: Do not call this function to replenish cache with a missing entry. As that use-case can
/// cause the cache to have duplicates. Use `replenish()` API for that use-case.
/// This is typically called when a deployed program is managed (un-/re-/deployed) via
/// loader instructions. Because of the cooldown, entires can not have the same
/// deployment_slot and effective_slot.
pub fn assign_program(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> Arc<LoadedProgram> {
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
let index = second_level
.iter()
.position(|at| at.effective_slot >= entry.effective_slot);
if let Some(index) = index {
let existing = second_level
.get(index)
.expect("Missing entry, even though position was found");
if existing.is_tombstone()
&& entry.is_tombstone()
&& existing.deployment_slot == entry.deployment_slot
{
// If there's already a tombstone for the program at the given slot, let's return
// the existing entry instead of adding another.
return existing.clone();
}
debug_assert!(
existing.deployment_slot != entry.deployment_slot
|| existing.effective_slot != entry.effective_slot
);
}
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
let (was_occupied, entry) = self.replenish(key, entry);
debug_assert!(!was_occupied);
entry
}
@ -391,8 +365,7 @@ impl LoadedPrograms {
mod tests {
use {
crate::loaded_programs::{
BlockRelation, ForkGraph, LoadedProgram, LoadedProgramEntry, LoadedProgramType,
LoadedPrograms, WorkingSlot,
BlockRelation, ForkGraph, LoadedProgram, LoadedProgramType, LoadedPrograms, WorkingSlot,
},
solana_rbpf::vm::BuiltInProgram,
solana_sdk::{clock::Slot, pubkey::Pubkey},
@ -694,10 +667,11 @@ mod tests {
// Add a program at slot 50, and a tombstone for the program at slot 60
let program2 = Pubkey::new_unique();
assert!(matches!(
cache.replenish(program2, new_test_builtin_program(50, 51)),
LoadedProgramEntry::WasVacant(_)
));
assert!(
!cache
.replenish(program2, new_test_builtin_program(50, 51))
.0
);
let second_level = &cache
.entries
.get(&program2)
@ -910,55 +884,25 @@ mod tests {
fork_graph.insert_fork(&[0, 5, 11, 25, 27]);
let program1 = Pubkey::new_unique();
assert!(matches!(
cache.replenish(program1, new_test_loaded_program(0, 1)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.replenish(program1, new_test_loaded_program(10, 11)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.replenish(program1, new_test_loaded_program(20, 21)),
LoadedProgramEntry::WasVacant(_)
));
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(10, 11)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0);
// Test: inserting duplicate entry return pre existing entry from the cache
assert!(matches!(
cache.replenish(program1, new_test_loaded_program(20, 21)),
LoadedProgramEntry::WasOccupied(_)
));
assert!(cache.replenish(program1, new_test_loaded_program(20, 21)).0);
let program2 = Pubkey::new_unique();
assert!(matches!(
cache.replenish(program2, new_test_loaded_program(5, 6)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.replenish(program2, new_test_loaded_program(11, 12)),
LoadedProgramEntry::WasVacant(_)
));
assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0);
assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0);
let program3 = Pubkey::new_unique();
assert!(matches!(
cache.replenish(program3, new_test_loaded_program(25, 26)),
LoadedProgramEntry::WasVacant(_)
));
assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0);
let program4 = Pubkey::new_unique();
assert!(matches!(
cache.replenish(program4, new_test_loaded_program(0, 1)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.replenish(program4, new_test_loaded_program(5, 6)),
LoadedProgramEntry::WasVacant(_)
));
assert!(!cache.replenish(program4, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program4, new_test_loaded_program(5, 6)).0);
// The following is a special case, where effective slot is 4 slots in the future
assert!(matches!(
cache.replenish(program4, new_test_loaded_program(15, 19)),
LoadedProgramEntry::WasVacant(_)
));
assert!(!cache.replenish(program4, new_test_loaded_program(15, 19)).0);
// Current fork graph
// 0

View File

@ -95,9 +95,7 @@ use {
compute_budget::{self, ComputeBudget},
executor_cache::{BankExecutorCache, TransactionExecutorCache, MAX_CACHED_EXECUTORS},
invoke_context::{BuiltinProgram, ProcessInstructionWithContext},
loaded_programs::{
LoadedProgram, LoadedProgramEntry, LoadedProgramType, LoadedPrograms, WorkingSlot,
},
loaded_programs::{LoadedProgram, LoadedProgramType, LoadedPrograms, WorkingSlot},
log_collector::LogCollector,
sysvar_cache::SysvarCache,
timings::{ExecuteTimingType, ExecuteTimings},
@ -4386,88 +4384,47 @@ impl Bank {
}
#[allow(dead_code)] // Preparation for BankExecutorCache rework
fn load_and_get_programs_from_cache<'a>(
fn load_and_get_programs_from_cache(
&self,
program_owners: &[&'a Pubkey],
sanitized_txs: &[SanitizedTransaction],
check_results: &mut [TransactionCheckResult],
) -> (
HashMap<Pubkey, &'a Pubkey>,
HashMap<Pubkey, Arc<LoadedProgram>>,
) {
let mut filter_programs_time = Measure::start("filter_programs_accounts");
let program_accounts_map = self.rc.accounts.filter_executable_program_accounts(
&self.ancestors,
sanitized_txs,
check_results,
program_owners,
&self.blockhash_queue.read().unwrap(),
);
filter_programs_time.stop();
program_accounts_map: &HashMap<Pubkey, &Pubkey>,
) -> HashMap<Pubkey, Arc<LoadedProgram>> {
let (mut loaded_programs_for_txs, missing_programs) = {
// Lock the global cache to figure out which programs need to be loaded
let loaded_programs_cache = self.loaded_programs_cache.read().unwrap();
loaded_programs_cache.extract(self, program_accounts_map.keys().cloned())
};
let mut filter_missing_programs_time = Measure::start("filter_missing_programs_accounts");
let (mut loaded_programs_for_txs, missing_programs) = self
.loaded_programs_cache
.read()
.unwrap()
.extract(self, program_accounts_map.keys().cloned());
filter_missing_programs_time.stop();
missing_programs
// Load missing programs while global cache is unlocked
let missing_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = missing_programs
.iter()
.for_each(|pubkey| match self.load_program(pubkey) {
Ok(program) => {
match self
.loaded_programs_cache
.write()
.unwrap()
.replenish(*pubkey, program)
{
LoadedProgramEntry::WasOccupied(entry) => {
loaded_programs_for_txs.insert(*pubkey, entry);
}
LoadedProgramEntry::WasVacant(new_entry) => {
loaded_programs_for_txs.insert(*pubkey, new_entry);
}
}
}
Err(e) => {
.map(|key| {
let program = self.load_program(key).unwrap_or_else(|err| {
// Create a tombstone for the program in the cache
debug!("Failed to load program {}, error {:?}", pubkey, e);
let tombstone = self.loaded_programs_cache.write().unwrap().assign_program(
*pubkey,
debug!("Failed to load program {}, error {:?}", key, err);
Arc::new(LoadedProgram::new_tombstone(
self.slot,
LoadedProgramType::FailedVerification,
)),
);
loaded_programs_for_txs.insert(*pubkey, tombstone);
}
))
});
(*key, program)
})
.collect();
(program_accounts_map, loaded_programs_for_txs)
// Lock the global cache again to replenish the missing programs
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
for (key, program) in missing_programs {
let (_was_occupied, entry) = loaded_programs_cache.replenish(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.insert(key, entry);
}
fn replenish_executor_cache<'a>(
&self,
program_owners: &[&'a Pubkey],
sanitized_txs: &[SanitizedTransaction],
check_results: &mut [TransactionCheckResult],
) -> (
HashMap<Pubkey, &'a Pubkey>,
HashMap<Pubkey, Arc<LoadedProgram>>,
) {
let mut filter_programs_time = Measure::start("filter_programs_accounts");
let program_accounts_map = self.rc.accounts.filter_executable_program_accounts(
&self.ancestors,
sanitized_txs,
check_results,
program_owners,
&self.blockhash_queue.read().unwrap(),
);
filter_programs_time.stop();
loaded_programs_for_txs
}
fn replenish_executor_cache(
&self,
program_accounts_map: &HashMap<Pubkey, &Pubkey>,
) -> HashMap<Pubkey, Arc<LoadedProgram>> {
let mut loaded_programs_for_txs = HashMap::new();
let mut filter_missing_programs_time = Measure::start("filter_missing_programs_accounts");
let missing_executors = program_accounts_map
@ -4487,22 +4444,17 @@ impl Bank {
.collect::<Vec<_>>();
filter_missing_programs_time.stop();
let executors = missing_executors
.iter()
.map(|pubkey| match self.load_program(pubkey) {
Ok(program) => {
loaded_programs_for_txs.insert(**pubkey, program.clone());
(**pubkey, program)
}
// Create a tombstone for the programs that failed to load
Err(_) => {
let tombstone = Arc::new(LoadedProgram::new_tombstone(
let executors = missing_executors.iter().map(|pubkey| {
let program = self.load_program(pubkey).unwrap_or_else(|err| {
// Create a tombstone for the program in the cache
debug!("Failed to load program {}, error {:?}", pubkey, err);
Arc::new(LoadedProgram::new_tombstone(
self.slot,
LoadedProgramType::FailedVerification,
));
loaded_programs_for_txs.insert(**pubkey, tombstone.clone());
(**pubkey, tombstone)
}
))
});
loaded_programs_for_txs.insert(**pubkey, program.clone());
(**pubkey, program)
});
// avoid locking the cache if there are no new executors
@ -4510,7 +4462,7 @@ impl Bank {
self.executor_cache.write().unwrap().put(executors);
}
(program_accounts_map, loaded_programs_for_txs)
loaded_programs_for_txs
}
#[allow(clippy::type_complexity)]
@ -4580,19 +4532,22 @@ impl Bank {
bpf_loader::id(),
bpf_loader_deprecated::id(),
];
let program_owners_refs: Vec<&Pubkey> = program_owners.iter().collect();
let program_accounts_map = self.rc.accounts.filter_executable_program_accounts(
&self.ancestors,
sanitized_txs,
&mut check_results,
&program_owners_refs,
&self.blockhash_queue.read().unwrap(),
);
// The following code is currently commented out. This is how the new cache will
// finally be used, once rest of the code blocks are in place.
/*
let (program_accounts_map, loaded_programs_map) = self.load_and_get_programs_from_cache(
&program_owners_refs,
sanitized_txs,
&check_results,
);
let loaded_programs_map =
self.load_and_get_programs_from_cache(&program_accounts_map);
*/
let (executable_programs_in_tx_batch, loaded_programs_map) =
self.replenish_executor_cache(&program_owners_refs, sanitized_txs, &mut check_results);
let loaded_programs_map = self.replenish_executor_cache(&program_accounts_map);
let tx_executor_cache = Rc::new(RefCell::new(TransactionExecutorCache::new(
loaded_programs_map.clone().into_iter(),
@ -4609,7 +4564,7 @@ impl Bank {
&self.feature_set,
&self.fee_structure,
account_overrides,
&executable_programs_in_tx_batch,
&program_accounts_map,
&loaded_programs_map,
);
load_time.stop();