diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 43f31f5d3..2a3fd711d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -490,7 +490,7 @@ impl BankingStage { // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. let (mut loaded_accounts, results, mut retryable_txs, tx_count, signature_count) = - bank.load_and_execute_transactions(txs, None, lock_results, MAX_PROCESSING_AGE); + bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE); load_execute_time.stop(); let freeze_lock = bank.freeze_lock(); diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 02a71f019..07a04d8d7 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -37,46 +37,35 @@ fn first_err(results: &[Result<()>]) -> Result<()> { Ok(()) } -fn par_execute_entries( - bank: &Bank, - entries: &[(&Entry, LockedAccountsResults, bool, Vec)], -) -> Result<()> { +fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", entries.len()); let results: Vec> = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { entries .into_par_iter() - .map( - |(e, locked_accounts, randomize_tx_order, random_txs_execution_order)| { - let tx_execution_order: Option<&[usize]> = if *randomize_tx_order { - Some(random_txs_execution_order) - } else { - None - }; - let results = bank.load_execute_and_commit_transactions( - &e.transactions, - tx_execution_order, - locked_accounts, - MAX_RECENT_BLOCKHASHES, - ); - let mut first_err = None; - for (r, tx) in results.iter().zip(e.transactions.iter()) { - if let Err(ref e) = r { - if first_err.is_none() { - first_err = Some(r.clone()); - } - if !Bank::can_commit(&r) { - warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx); - datapoint_error!( - "validator_process_entry_error", - ("error", format!("error: {:?}, tx: {:?}", e, tx), String) - ); - } + .map(|(e, locked_accounts)| { + let results = bank.load_execute_and_commit_transactions( + &e.transactions, + locked_accounts, + MAX_RECENT_BLOCKHASHES, + ); + let mut first_err = None; + for (r, tx) in results.iter().zip(e.transactions.iter()) { + if let Err(ref e) = r { + if first_err.is_none() { + first_err = Some(r.clone()); + } + if !Bank::can_commit(&r) { + warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx); + datapoint_error!( + "validator_process_entry_error", + ("error", format!("error: {:?}, tx: {:?}", e, tx), String) + ); } } - first_err.unwrap_or(Ok(())) - }, - ) + } + first_err.unwrap_or(Ok(())) + }) .collect() }) }); @@ -106,16 +95,11 @@ pub fn process_entries( } // else loop on processing the entry loop { - // random_txs_execution_order need to be seperately defined apart from txs_execution_order, - // to satisfy borrow checker. - let mut random_txs_execution_order: Vec = vec![]; - if randomize_tx_execution_order { - random_txs_execution_order = (0..entry.transactions.len()).collect(); + let txs_execution_order = if randomize_tx_execution_order { + let mut random_txs_execution_order: Vec = + (0..entry.transactions.len()).collect(); random_txs_execution_order.shuffle(&mut thread_rng()); - } - - let txs_execution_order: Option<&[usize]> = if randomize_tx_execution_order { - Some(&random_txs_execution_order) + Some(random_txs_execution_order) } else { None }; @@ -127,13 +111,7 @@ pub fn process_entries( // if locking worked if first_lock_err.is_ok() { - // push the entry to the mt_group - mt_group.push(( - entry, - lock_results, - randomize_tx_execution_order, - random_txs_execution_order, - )); + mt_group.push((entry, lock_results)); // done with this entry break; } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 15341c200..e9e6446b1 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -382,14 +382,14 @@ impl Accounts { .map_or(false, |lock| *lock.lock_count.lock().unwrap() > 0) { error_counters.account_in_use += 1; - debug!("Account in use: {:?}", k); + debug!("CD Account in use: {:?}", k); return Err(TransactionError::AccountInUse); } } for k in credit_only_keys.iter() { if locks.contains(k) { error_counters.account_in_use += 1; - debug!("Account in use: {:?}", k); + debug!("CO Account in use: {:?}", k); return Err(TransactionError::AccountInUse); } } @@ -509,13 +509,21 @@ impl Accounts { } /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline - pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { + pub fn unlock_accounts( + &self, + txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, + results: &[Result<()>], + ) { let mut account_locks = self.account_locks.lock().unwrap(); let credit_only_locks = self.credit_only_account_locks.clone(); debug!("bank unlock accounts"); - txs.iter().zip(results.iter()).for_each(|(tx, result)| { - Self::unlock_account(tx, result, &mut account_locks, &credit_only_locks) - }); + + OrderedIterator::new(txs, txs_iteration_order) + .zip(results.iter()) + .for_each(|(tx, result)| { + Self::unlock_account(tx, result, &mut account_locks, &credit_only_locks) + }); } pub fn has_accounts(&self, fork: Fork) -> bool { @@ -1324,8 +1332,8 @@ mod tests { 2 ); - accounts.unlock_accounts(&[tx], &results0); - accounts.unlock_accounts(&txs, &results1); + accounts.unlock_accounts(&[tx], None, &results0); + accounts.unlock_accounts(&txs, None, &results1); let instructions = vec![CompiledInstruction::new(2, &(), vec![0, 1])]; let message = Message::new_with_compiled_instructions( @@ -1405,7 +1413,7 @@ mod tests { counter_clone.clone().fetch_add(1, Ordering::SeqCst); } } - accounts_clone.unlock_accounts(&txs, &results); + accounts_clone.unlock_accounts(&txs, None, &results); if exit_clone.clone().load(Ordering::Relaxed) { break; } @@ -1420,7 +1428,7 @@ mod tests { thread::sleep(time::Duration::from_millis(50)); assert_eq!(counter_value, counter_clone.clone().load(Ordering::SeqCst)); } - accounts_arc.unlock_accounts(&txs, &results); + accounts_arc.unlock_accounts(&txs, None, &results); thread::sleep(time::Duration::from_millis(50)); } exit.store(true, Ordering::Relaxed); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 678beb063..afd088c60 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -768,15 +768,18 @@ impl Bank { pub fn lock_accounts<'a, 'b>( &'a self, txs: &'b [Transaction], - txs_iteration_order: Option<&[usize]>, + txs_iteration_order: Option>, ) -> LockedAccountsResults<'a, 'b> { if self.is_frozen() { warn!("=========== FIXME: lock_accounts() working on a frozen bank! ================"); } // TODO: put this assert back in // assert!(!self.is_frozen()); - let results = self.rc.accounts.lock_accounts(txs, txs_iteration_order); - LockedAccountsResults::new(results, &self, txs) + let results = self + .rc + .accounts + .lock_accounts(txs, txs_iteration_order.as_ref().map(|v| v.as_slice())); + LockedAccountsResults::new(results, &self, txs, txs_iteration_order) } pub fn unlock_accounts(&self, locked_accounts_results: &mut LockedAccountsResults) { @@ -784,6 +787,7 @@ impl Bank { locked_accounts_results.needs_unlock = false; self.rc.accounts.unlock_accounts( locked_accounts_results.transactions(), + locked_accounts_results.txs_iteration_order(), locked_accounts_results.locked_accounts_results(), ) } @@ -956,7 +960,6 @@ impl Bank { pub fn load_and_execute_transactions( &self, txs: &[Transaction], - txs_iteration_order: Option<&[usize]>, lock_results: &LockedAccountsResults, max_age: usize, ) -> ( @@ -971,32 +974,41 @@ impl Bank { let mut error_counters = ErrorCounters::default(); let mut load_time = Measure::start("accounts_load"); - let retryable_txs: Vec<_> = - OrderedIterator::new(lock_results.locked_accounts_results(), txs_iteration_order) - .enumerate() - .filter_map(|(index, res)| match res { - Err(TransactionError::AccountInUse) => Some(index), - Ok(_) => None, - Err(_) => None, - }) - .collect(); + let retryable_txs: Vec<_> = OrderedIterator::new( + lock_results.locked_accounts_results(), + lock_results.txs_iteration_order(), + ) + .enumerate() + .filter_map(|(index, res)| match res { + Err(TransactionError::AccountInUse) => Some(index), + Ok(_) => None, + Err(_) => None, + }) + .collect(); let sig_results = self.check_transactions( txs, - txs_iteration_order, + lock_results.txs_iteration_order(), lock_results.locked_accounts_results(), max_age, &mut error_counters, ); - let mut loaded_accounts = - self.load_accounts(txs, txs_iteration_order, sig_results, &mut error_counters); + let mut loaded_accounts = self.load_accounts( + txs, + lock_results.txs_iteration_order(), + sig_results, + &mut error_counters, + ); load_time.stop(); let mut execution_time = Measure::start("execution_time"); let mut signature_count = 0; let executed: Vec> = loaded_accounts .iter_mut() - .zip(OrderedIterator::new(txs, txs_iteration_order)) + .zip(OrderedIterator::new( + txs, + lock_results.txs_iteration_order(), + )) .map(|(accs, tx)| match accs { Err(e) => Err(e.clone()), Ok((ref mut accounts, ref mut loaders, ref mut credits, ref mut _rents)) => { @@ -1134,16 +1146,15 @@ impl Bank { pub fn load_execute_and_commit_transactions( &self, txs: &[Transaction], - txs_iteration_order: Option<&[usize]>, lock_results: &LockedAccountsResults, max_age: usize, ) -> Vec> { let (mut loaded_accounts, executed, _, tx_count, signature_count) = - self.load_and_execute_transactions(txs, txs_iteration_order, lock_results, max_age); + self.load_and_execute_transactions(txs, lock_results, max_age); self.commit_transactions( txs, - txs_iteration_order, + lock_results.txs_iteration_order(), &mut loaded_accounts, &executed, tx_count, @@ -1154,7 +1165,7 @@ impl Bank { #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { let lock_results = self.lock_accounts(txs, None); - self.load_execute_and_commit_transactions(txs, None, &lock_results, MAX_RECENT_BLOCKHASHES) + self.load_execute_and_commit_transactions(txs, &lock_results, MAX_RECENT_BLOCKHASHES) } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -2103,7 +2114,6 @@ mod tests { let lock_result = bank.lock_accounts(&pay_alice, None); let results_alice = bank.load_execute_and_commit_transactions( &pay_alice, - None, &lock_result, MAX_RECENT_BLOCKHASHES, ); diff --git a/runtime/src/locked_accounts_results.rs b/runtime/src/locked_accounts_results.rs index ed228fef9..b50840cb9 100644 --- a/runtime/src/locked_accounts_results.rs +++ b/runtime/src/locked_accounts_results.rs @@ -6,6 +6,7 @@ pub struct LockedAccountsResults<'a, 'b> { locked_accounts_results: Vec>, bank: &'a Bank, transactions: &'b [Transaction], + txs_iteration_order: Option>, pub(crate) needs_unlock: bool, } @@ -14,11 +15,13 @@ impl<'a, 'b> LockedAccountsResults<'a, 'b> { locked_accounts_results: Vec>, bank: &'a Bank, transactions: &'b [Transaction], + txs_iteration_order: Option>, ) -> Self { Self { locked_accounts_results, bank, transactions, + txs_iteration_order, needs_unlock: true, } } @@ -30,6 +33,10 @@ impl<'a, 'b> LockedAccountsResults<'a, 'b> { pub fn transactions(&self) -> &[Transaction] { self.transactions } + + pub fn txs_iteration_order(&self) -> Option<&[usize]> { + self.txs_iteration_order.as_ref().map(|v| v.as_slice()) + } } // Unlock all locked accounts in destructor.