Add limit to looping in banking-stage (#35342)

This commit is contained in:
Pankaj Garg 2024-02-28 17:36:45 -08:00 committed by GitHub
parent 312f786abf
commit 990ca1d0b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 35 additions and 9 deletions

View File

@ -598,7 +598,8 @@ impl Consumer {
transaction_status_sender_enabled, transaction_status_sender_enabled,
&mut execute_and_commit_timings.execute_timings, &mut execute_and_commit_timings.execute_timings,
None, // account_overrides None, // account_overrides
self.log_messages_bytes_limit self.log_messages_bytes_limit,
true,
)); ));
execute_and_commit_timings.load_execute_us = load_execute_us; execute_and_commit_timings.load_execute_us = load_execute_us;

View File

@ -195,7 +195,7 @@ impl Stats {
("reloads", reloads, i64), ("reloads", reloads, i64),
("insertions", insertions, i64), ("insertions", insertions, i64),
("lost_insertions", lost_insertions, i64), ("lost_insertions", lost_insertions, i64),
("replacements", replacements, i64), ("replace_entry", replacements, i64),
("one_hit_wonders", one_hit_wonders, i64), ("one_hit_wonders", one_hit_wonders, i64),
("prunes_orphan", prunes_orphan, i64), ("prunes_orphan", prunes_orphan, i64),
("prunes_environment", prunes_environment, i64), ("prunes_environment", prunes_environment, i64),
@ -618,6 +618,7 @@ pub struct LoadedProgramsForTxBatch {
entries: HashMap<Pubkey, Arc<LoadedProgram>>, entries: HashMap<Pubkey, Arc<LoadedProgram>>,
slot: Slot, slot: Slot,
pub environments: ProgramRuntimeEnvironments, pub environments: ProgramRuntimeEnvironments,
pub hit_max_limit: bool,
} }
impl LoadedProgramsForTxBatch { impl LoadedProgramsForTxBatch {
@ -626,6 +627,7 @@ impl LoadedProgramsForTxBatch {
entries: HashMap::new(), entries: HashMap::new(),
slot, slot,
environments, environments,
hit_max_limit: false,
} }
} }
@ -964,7 +966,7 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
slot: Slot, slot: Slot,
key: Pubkey, key: Pubkey,
loaded_program: Arc<LoadedProgram>, loaded_program: Arc<LoadedProgram>,
) { ) -> bool {
let second_level = self.entries.entry(key).or_default(); let second_level = self.entries.entry(key).or_default();
debug_assert_eq!( debug_assert_eq!(
second_level.cooperative_loading_lock, second_level.cooperative_loading_lock,
@ -985,8 +987,9 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
{ {
self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed); self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed);
} }
self.assign_program(key, loaded_program); let was_occupied = self.assign_program(key, loaded_program);
self.loading_task_waiter.notify(); self.loading_task_waiter.notify();
was_occupied
} }
pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) { pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {

View File

@ -4299,6 +4299,7 @@ impl Bank {
&mut timings, &mut timings,
Some(&account_overrides), Some(&account_overrides),
None, None,
true,
); );
let post_simulation_accounts = loaded_transactions let post_simulation_accounts = loaded_transactions
@ -4537,7 +4538,7 @@ impl Bank {
balances balances
} }
#[allow(clippy::type_complexity)] #[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub fn load_and_execute_transactions( pub fn load_and_execute_transactions(
&self, &self,
batch: &TransactionBatch, batch: &TransactionBatch,
@ -4548,6 +4549,7 @@ impl Bank {
timings: &mut ExecuteTimings, timings: &mut ExecuteTimings,
account_overrides: Option<&AccountOverrides>, account_overrides: Option<&AccountOverrides>,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
limit_to_load_programs: bool,
) -> LoadAndExecuteTransactionsOutput { ) -> LoadAndExecuteTransactionsOutput {
let sanitized_txs = batch.sanitized_transactions(); let sanitized_txs = batch.sanitized_transactions();
debug!("processing transactions: {}", sanitized_txs.len()); debug!("processing transactions: {}", sanitized_txs.len());
@ -4614,6 +4616,7 @@ impl Bank {
account_overrides, account_overrides,
self.builtin_programs.iter(), self.builtin_programs.iter(),
log_messages_bytes_limit, log_messages_bytes_limit,
limit_to_load_programs,
); );
let mut signature_count = 0; let mut signature_count = 0;
@ -5663,6 +5666,7 @@ impl Bank {
timings, timings,
None, None,
log_messages_bytes_limit, log_messages_bytes_limit,
false,
); );
let (last_blockhash, lamports_per_signature) = let (last_blockhash, lamports_per_signature) =

View File

@ -190,6 +190,7 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
account_overrides: Option<&AccountOverrides>, account_overrides: Option<&AccountOverrides>,
builtin_programs: impl Iterator<Item = &'a Pubkey>, builtin_programs: impl Iterator<Item = &'a Pubkey>,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
limit_to_load_programs: bool,
) -> LoadAndExecuteSanitizedTransactionsOutput { ) -> LoadAndExecuteSanitizedTransactionsOutput {
let mut program_accounts_map = Self::filter_executable_program_accounts( let mut program_accounts_map = Self::filter_executable_program_accounts(
callbacks, callbacks,
@ -202,9 +203,18 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
program_accounts_map.insert(*builtin_program, (&native_loader, 0)); program_accounts_map.insert(*builtin_program, (&native_loader, 0));
} }
let programs_loaded_for_tx_batch = Rc::new(RefCell::new( let programs_loaded_for_tx_batch = Rc::new(RefCell::new(self.replenish_program_cache(
self.replenish_program_cache(callbacks, &program_accounts_map), callbacks,
)); &program_accounts_map,
limit_to_load_programs,
)));
if programs_loaded_for_tx_batch.borrow().hit_max_limit {
return LoadAndExecuteSanitizedTransactionsOutput {
loaded_transactions: vec![],
execution_results: vec![],
};
}
let mut load_time = Measure::start("accounts_load"); let mut load_time = Measure::start("accounts_load");
let mut loaded_transactions = load_accounts( let mut loaded_transactions = load_accounts(
@ -356,6 +366,7 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
&self, &self,
callback: &CB, callback: &CB,
program_accounts_map: &HashMap<Pubkey, (&Pubkey, u64)>, program_accounts_map: &HashMap<Pubkey, (&Pubkey, u64)>,
limit_to_load_programs: bool,
) -> LoadedProgramsForTxBatch { ) -> LoadedProgramsForTxBatch {
let mut missing_programs: Vec<(Pubkey, (LoadedProgramMatchCriteria, u64))> = let mut missing_programs: Vec<(Pubkey, (LoadedProgramMatchCriteria, u64))> =
if self.check_program_modification_slot { if self.check_program_modification_slot {
@ -401,7 +412,14 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
} }
// Submit our last completed loading task. // Submit our last completed loading task.
if let Some((key, program)) = program_to_store.take() { if let Some((key, program)) = program_to_store.take() {
loaded_programs_cache.finish_cooperative_loading_task(self.slot, key, program); if loaded_programs_cache
.finish_cooperative_loading_task(self.slot, key, program)
&& limit_to_load_programs
{
let mut ret = LoadedProgramsForTxBatch::default();
ret.hit_max_limit = true;
return ret;
}
} }
// Figure out which program needs to be loaded next. // Figure out which program needs to be loaded next.
let program_to_load = loaded_programs_cache.extract( let program_to_load = loaded_programs_cache.extract(