Find and load missing programs in LoadedPrograms cache (#30275)
* Find and load missing programs in LoadedPrograms cache - filter program accounts in a transaction batch - filter the accounts that are missing in LoadedPrograms cache - load the programs before processing the transactions - unit tests * address review comments * fix clippy * address review comments * fix test * fix more tests
This commit is contained in:
parent
b0f7b782d3
commit
b1f5b0d790
|
@ -41,9 +41,9 @@ impl TransactionExecutorCache {
|
|||
self.visible.get(key).cloned()
|
||||
}
|
||||
|
||||
pub fn set_tombstone(&mut self, key: Pubkey) {
|
||||
pub fn set_tombstone(&mut self, key: Pubkey, slot: Slot) {
|
||||
self.visible
|
||||
.insert(key, Arc::new(LoadedProgram::new_tombstone()));
|
||||
.insert(key, Arc::new(LoadedProgram::new_tombstone(slot)));
|
||||
}
|
||||
|
||||
pub fn set(
|
||||
|
@ -52,12 +52,13 @@ impl TransactionExecutorCache {
|
|||
executor: Arc<LoadedProgram>,
|
||||
upgrade: bool,
|
||||
delay_visibility_of_program_deployment: bool,
|
||||
current_slot: Slot,
|
||||
) {
|
||||
if upgrade {
|
||||
if delay_visibility_of_program_deployment {
|
||||
// Place a tombstone in the cache so that
|
||||
// we don't load the new version from the database as it should remain invisible
|
||||
self.set_tombstone(key);
|
||||
self.set_tombstone(key, current_slot);
|
||||
} else {
|
||||
self.visible.insert(key, executor.clone());
|
||||
}
|
||||
|
|
|
@ -180,12 +180,12 @@ impl LoadedProgram {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn new_tombstone() -> Self {
|
||||
pub fn new_tombstone(slot: Slot) -> Self {
|
||||
Self {
|
||||
program: LoadedProgramType::Invalid,
|
||||
account_size: 0,
|
||||
deployment_slot: 0,
|
||||
effective_slot: 0,
|
||||
deployment_slot: slot,
|
||||
effective_slot: slot,
|
||||
usage_counter: AtomicU64::default(),
|
||||
}
|
||||
}
|
||||
|
@ -219,8 +219,10 @@ pub enum LoadedProgramEntry {
|
|||
}
|
||||
|
||||
impl LoadedPrograms {
|
||||
/// Inserts a single entry
|
||||
pub fn insert_entry(&mut self, key: Pubkey, entry: LoadedProgram) -> LoadedProgramEntry {
|
||||
/// Refill the cache with a single entry. It's typically called during transaction processing,
|
||||
/// when the cache doesn't contain the entry corresponding to program `key`.
|
||||
/// The function dedupes the cache, in case some other thread replenished the the entry in parallel.
|
||||
pub fn replenish(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> LoadedProgramEntry {
|
||||
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
|
||||
let index = second_level
|
||||
.iter()
|
||||
|
@ -235,9 +237,32 @@ impl LoadedPrograms {
|
|||
return LoadedProgramEntry::WasOccupied(existing.clone());
|
||||
}
|
||||
}
|
||||
let new_entry = Arc::new(entry);
|
||||
second_level.insert(index.unwrap_or(second_level.len()), new_entry.clone());
|
||||
LoadedProgramEntry::WasVacant(new_entry)
|
||||
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
|
||||
LoadedProgramEntry::WasVacant(entry)
|
||||
}
|
||||
|
||||
/// Assign the program `entry` to the given `key` in the cache.
|
||||
/// This is typically called when a deployed program is managed (upgraded/un/reddeployed) via
|
||||
/// bpf loader instructions.
|
||||
/// The program management is not expected to overlap with initial program deployment slot.
|
||||
/// Note: Do not call this function to replenish cache with a missing entry. As that use-case can
|
||||
/// cause the cache to have duplicates. Use `replenish()` API for that use-case.
|
||||
pub fn assign_program(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> Arc<LoadedProgram> {
|
||||
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
|
||||
let index = second_level
|
||||
.iter()
|
||||
.position(|at| at.effective_slot >= entry.effective_slot);
|
||||
if let Some(index) = index {
|
||||
let existing = second_level
|
||||
.get(index)
|
||||
.expect("Missing entry, even though position was found");
|
||||
assert!(
|
||||
existing.deployment_slot != entry.deployment_slot
|
||||
|| existing.effective_slot != entry.effective_slot
|
||||
);
|
||||
}
|
||||
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
|
||||
entry
|
||||
}
|
||||
|
||||
/// Before rerooting the blockstore this removes all programs of orphan forks
|
||||
|
@ -310,6 +335,7 @@ mod tests {
|
|||
BlockRelation, ForkGraph, LoadedProgram, LoadedProgramEntry, LoadedProgramType,
|
||||
LoadedPrograms, WorkingSlot,
|
||||
},
|
||||
solana_rbpf::vm::BuiltInProgram,
|
||||
solana_sdk::{clock::Slot, pubkey::Pubkey},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
|
@ -318,11 +344,70 @@ mod tests {
|
|||
},
|
||||
};
|
||||
|
||||
fn new_test_builtin_program(deployment_slot: Slot, effective_slot: Slot) -> Arc<LoadedProgram> {
|
||||
Arc::new(LoadedProgram {
|
||||
program: LoadedProgramType::BuiltIn(BuiltInProgram::default()),
|
||||
account_size: 0,
|
||||
deployment_slot,
|
||||
effective_slot,
|
||||
usage_counter: AtomicU64::default(),
|
||||
})
|
||||
}
|
||||
|
||||
fn set_tombstone(cache: &mut LoadedPrograms, key: Pubkey, slot: Slot) -> Arc<LoadedProgram> {
|
||||
cache.assign_program(key, Arc::new(LoadedProgram::new_tombstone(slot)))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tombstone() {
|
||||
let tombstone = LoadedProgram::new_tombstone();
|
||||
let tombstone = LoadedProgram::new_tombstone(0);
|
||||
assert!(matches!(tombstone.program, LoadedProgramType::Invalid));
|
||||
assert!(tombstone.is_tombstone());
|
||||
assert_eq!(tombstone.deployment_slot, 0);
|
||||
assert_eq!(tombstone.effective_slot, 0);
|
||||
|
||||
let tombstone = LoadedProgram::new_tombstone(100);
|
||||
assert!(matches!(tombstone.program, LoadedProgramType::Invalid));
|
||||
assert!(tombstone.is_tombstone());
|
||||
assert_eq!(tombstone.deployment_slot, 100);
|
||||
assert_eq!(tombstone.effective_slot, 100);
|
||||
|
||||
let mut cache = LoadedPrograms::default();
|
||||
let program1 = Pubkey::new_unique();
|
||||
let tombstone = set_tombstone(&mut cache, program1, 10);
|
||||
let second_level = &cache
|
||||
.entries
|
||||
.get(&program1)
|
||||
.expect("Failed to find the entry");
|
||||
assert_eq!(second_level.len(), 1);
|
||||
assert!(second_level.get(0).unwrap().is_tombstone());
|
||||
assert_eq!(tombstone.deployment_slot, 10);
|
||||
assert_eq!(tombstone.effective_slot, 10);
|
||||
|
||||
// Add a program at slot 50, and a tombstone for the program at slot 60
|
||||
let program2 = Pubkey::new_unique();
|
||||
assert!(matches!(
|
||||
cache.replenish(program2, new_test_builtin_program(50, 51)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
let second_level = &cache
|
||||
.entries
|
||||
.get(&program2)
|
||||
.expect("Failed to find the entry");
|
||||
assert_eq!(second_level.len(), 1);
|
||||
assert!(!second_level.get(0).unwrap().is_tombstone());
|
||||
|
||||
let tombstone = set_tombstone(&mut cache, program2, 60);
|
||||
let second_level = &cache
|
||||
.entries
|
||||
.get(&program2)
|
||||
.expect("Failed to find the entry");
|
||||
assert_eq!(second_level.len(), 2);
|
||||
assert!(!second_level.get(0).unwrap().is_tombstone());
|
||||
assert!(second_level.get(1).unwrap().is_tombstone());
|
||||
assert!(tombstone.is_tombstone());
|
||||
assert_eq!(tombstone.deployment_slot, 60);
|
||||
assert_eq!(tombstone.effective_slot, 60);
|
||||
}
|
||||
|
||||
struct TestForkGraph {
|
||||
|
@ -464,14 +549,14 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn new_test_loaded_program(deployment_slot: Slot, effective_slot: Slot) -> LoadedProgram {
|
||||
LoadedProgram {
|
||||
fn new_test_loaded_program(deployment_slot: Slot, effective_slot: Slot) -> Arc<LoadedProgram> {
|
||||
Arc::new(LoadedProgram {
|
||||
program: LoadedProgramType::Invalid,
|
||||
account_size: 0,
|
||||
deployment_slot,
|
||||
effective_slot,
|
||||
usage_counter: AtomicU64::default(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn match_slot(
|
||||
|
@ -511,52 +596,52 @@ mod tests {
|
|||
|
||||
let program1 = Pubkey::new_unique();
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program1, new_test_loaded_program(0, 1)),
|
||||
cache.replenish(program1, new_test_loaded_program(0, 1)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program1, new_test_loaded_program(10, 11)),
|
||||
cache.replenish(program1, new_test_loaded_program(10, 11)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program1, new_test_loaded_program(20, 21)),
|
||||
cache.replenish(program1, new_test_loaded_program(20, 21)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
|
||||
// Test: inserting duplicate entry return pre existing entry from the cache
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program1, new_test_loaded_program(20, 21)),
|
||||
cache.replenish(program1, new_test_loaded_program(20, 21)),
|
||||
LoadedProgramEntry::WasOccupied(_)
|
||||
));
|
||||
|
||||
let program2 = Pubkey::new_unique();
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program2, new_test_loaded_program(5, 6)),
|
||||
cache.replenish(program2, new_test_loaded_program(5, 6)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program2, new_test_loaded_program(11, 12)),
|
||||
cache.replenish(program2, new_test_loaded_program(11, 12)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
|
||||
let program3 = Pubkey::new_unique();
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program3, new_test_loaded_program(25, 26)),
|
||||
cache.replenish(program3, new_test_loaded_program(25, 26)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
|
||||
let program4 = Pubkey::new_unique();
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program4, new_test_loaded_program(0, 1)),
|
||||
cache.replenish(program4, new_test_loaded_program(0, 1)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program4, new_test_loaded_program(5, 6)),
|
||||
cache.replenish(program4, new_test_loaded_program(5, 6)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
// The following is a special case, where effective slot is 4 slots in the future
|
||||
assert!(matches!(
|
||||
cache.insert_entry(program4, new_test_loaded_program(15, 19)),
|
||||
cache.replenish(program4, new_test_loaded_program(15, 19)),
|
||||
LoadedProgramEntry::WasVacant(_)
|
||||
));
|
||||
|
||||
|
|
|
@ -258,6 +258,7 @@ pub fn load_program_from_account(
|
|||
loaded_program.clone(),
|
||||
false,
|
||||
feature_set.is_active(&delay_visibility_of_program_deployment::id()),
|
||||
deployment_slot,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -291,6 +292,7 @@ macro_rules! deploy_program {
|
|||
Arc::new(executor),
|
||||
true,
|
||||
delay_visibility_of_program_deployment,
|
||||
$slot,
|
||||
);
|
||||
}};
|
||||
}
|
||||
|
@ -1183,10 +1185,11 @@ fn process_loader_upgradeable_instruction(
|
|||
.feature_set
|
||||
.is_active(&delay_visibility_of_program_deployment::id())
|
||||
{
|
||||
let clock = invoke_context.get_sysvar_cache().get_clock()?;
|
||||
invoke_context
|
||||
.tx_executor_cache
|
||||
.borrow_mut()
|
||||
.set_tombstone(program_key);
|
||||
.set_tombstone(program_key, clock.slot);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
|
|
|
@ -94,7 +94,7 @@ use {
|
|||
compute_budget::{self, ComputeBudget},
|
||||
executor_cache::{BankExecutorCache, TransactionExecutorCache, MAX_CACHED_EXECUTORS},
|
||||
invoke_context::{BuiltinProgram, ProcessInstructionWithContext},
|
||||
loaded_programs::{LoadedProgram, LoadedPrograms, WorkingSlot},
|
||||
loaded_programs::{LoadedProgram, LoadedProgramEntry, LoadedPrograms, WorkingSlot},
|
||||
log_collector::LogCollector,
|
||||
sysvar_cache::SysvarCache,
|
||||
timings::{ExecuteTimingType, ExecuteTimings},
|
||||
|
@ -105,6 +105,7 @@ use {
|
|||
AccountSharedData, InheritableAccountFields, ReadableAccount, WritableAccount,
|
||||
},
|
||||
account_utils::StateMut,
|
||||
bpf_loader, bpf_loader_deprecated,
|
||||
bpf_loader_upgradeable::{self, UpgradeableLoaderState},
|
||||
clock::{
|
||||
BankId, Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_HASHES_PER_TICK,
|
||||
|
@ -4338,6 +4339,68 @@ impl Bank {
|
|||
|| self.cluster_type() != ClusterType::MainnetBeta
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Preparation for BankExecutorCache rework
|
||||
fn load_and_get_programs_from_cache<'a>(
|
||||
&self,
|
||||
program_owners: &[&'a Pubkey],
|
||||
sanitized_txs: &[SanitizedTransaction],
|
||||
check_results: &mut [TransactionCheckResult],
|
||||
) -> (
|
||||
HashMap<Pubkey, &'a Pubkey>,
|
||||
HashMap<Pubkey, Arc<LoadedProgram>>,
|
||||
) {
|
||||
let mut filter_programs_time = Measure::start("filter_programs_accounts");
|
||||
let program_accounts_map = self.rc.accounts.filter_executable_program_accounts(
|
||||
&self.ancestors,
|
||||
sanitized_txs,
|
||||
check_results,
|
||||
program_owners,
|
||||
&self.blockhash_queue.read().unwrap(),
|
||||
);
|
||||
filter_programs_time.stop();
|
||||
|
||||
let mut filter_missing_programs_time = Measure::start("filter_missing_programs_accounts");
|
||||
let (mut loaded_programs_for_txs, missing_programs) = self
|
||||
.loaded_programs_cache
|
||||
.read()
|
||||
.unwrap()
|
||||
.extract(self, program_accounts_map.keys().cloned());
|
||||
filter_missing_programs_time.stop();
|
||||
|
||||
missing_programs
|
||||
.iter()
|
||||
.for_each(|pubkey| match self.load_program(pubkey) {
|
||||
Ok(program) => {
|
||||
match self
|
||||
.loaded_programs_cache
|
||||
.write()
|
||||
.unwrap()
|
||||
.replenish(*pubkey, program)
|
||||
{
|
||||
LoadedProgramEntry::WasOccupied(entry) => {
|
||||
loaded_programs_for_txs.insert(*pubkey, entry);
|
||||
}
|
||||
LoadedProgramEntry::WasVacant(new_entry) => {
|
||||
loaded_programs_for_txs.insert(*pubkey, new_entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
// Create a tombstone for the program in the cache
|
||||
debug!("Failed to load program {}, error {:?}", pubkey, e);
|
||||
let tombstone = self
|
||||
.loaded_programs_cache
|
||||
.write()
|
||||
.unwrap()
|
||||
.assign_program(*pubkey, Arc::new(LoadedProgram::new_tombstone(self.slot)));
|
||||
loaded_programs_for_txs.insert(*pubkey, tombstone);
|
||||
}
|
||||
});
|
||||
|
||||
(program_accounts_map, loaded_programs_for_txs)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn load_and_execute_transactions(
|
||||
&self,
|
||||
|
@ -4400,6 +4463,24 @@ impl Bank {
|
|||
);
|
||||
check_time.stop();
|
||||
|
||||
let program_owners: Vec<Pubkey> = vec![
|
||||
bpf_loader_upgradeable::id(),
|
||||
bpf_loader::id(),
|
||||
bpf_loader_deprecated::id(),
|
||||
native_loader::id(),
|
||||
];
|
||||
|
||||
let _program_owners_refs: Vec<&Pubkey> = program_owners.iter().collect();
|
||||
// The following code is currently commented out. This is how the new cache will
|
||||
// finally be used, once rest of the code blocks are in place.
|
||||
/*
|
||||
let (program_accounts_map, loaded_programs_map) = self.load_and_get_programs_from_cache(
|
||||
&program_owners_refs,
|
||||
sanitized_txs,
|
||||
&check_results,
|
||||
);
|
||||
*/
|
||||
|
||||
let mut load_time = Measure::start("accounts_load");
|
||||
let mut loaded_transactions = self.rc.accounts.load_accounts(
|
||||
&self.ancestors,
|
||||
|
|
|
@ -7706,10 +7706,10 @@ fn test_bank_executor_cache() {
|
|||
// do work
|
||||
let mut executors =
|
||||
TransactionExecutorCache::new((2..3).map(|i| (accounts[i].0, executor.clone())));
|
||||
executors.set(key1, executor.clone(), false, true);
|
||||
executors.set(key2, executor.clone(), false, true);
|
||||
executors.set(key3, executor.clone(), true, true);
|
||||
executors.set(key4, executor, false, true);
|
||||
executors.set(key1, executor.clone(), false, true, bank.slot());
|
||||
executors.set(key2, executor.clone(), false, true, bank.slot());
|
||||
executors.set(key3, executor.clone(), true, true, bank.slot());
|
||||
executors.set(key4, executor, false, true, bank.slot());
|
||||
let executors = Rc::new(RefCell::new(executors));
|
||||
|
||||
// store Missing
|
||||
|
@ -7848,7 +7848,7 @@ fn test_bank_executor_cow() {
|
|||
|
||||
// add one to root bank
|
||||
let mut executors = TransactionExecutorCache::default();
|
||||
executors.set(key1, executor.clone(), false, true);
|
||||
executors.set(key1, executor.clone(), false, true, root.slot());
|
||||
let executors = Rc::new(RefCell::new(executors));
|
||||
root.store_executors_which_added_to_the_cache(&executors);
|
||||
let executors = root.get_tx_executor_cache(accounts);
|
||||
|
@ -7863,7 +7863,7 @@ fn test_bank_executor_cow() {
|
|||
assert_eq!(executors.borrow().visible.len(), 1);
|
||||
|
||||
let mut executors = TransactionExecutorCache::default();
|
||||
executors.set(key2, executor, false, true);
|
||||
executors.set(key2, executor, false, true, fork1.slot());
|
||||
let executors = Rc::new(RefCell::new(executors));
|
||||
fork1.store_executors_which_added_to_the_cache(&executors);
|
||||
|
||||
|
|
Loading…
Reference in New Issue