Respect randomized transaction order when unlocking accounts (#5918)

This commit is contained in:
Michael Vines 2019-09-16 21:45:16 -07:00 committed by GitHub
parent 7459eb15c3
commit f10438d530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 82 deletions

View File

@ -490,7 +490,7 @@ impl BankingStage {
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let (mut loaded_accounts, results, mut retryable_txs, tx_count, signature_count) =
bank.load_and_execute_transactions(txs, None, lock_results, MAX_PROCESSING_AGE);
bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE);
load_execute_time.stop();
let freeze_lock = bank.freeze_lock();

View File

@ -37,46 +37,35 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
Ok(())
}
fn par_execute_entries(
bank: &Bank,
entries: &[(&Entry, LockedAccountsResults, bool, Vec<usize>)],
) -> Result<()> {
fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", entries.len());
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
entries
.into_par_iter()
.map(
|(e, locked_accounts, randomize_tx_order, random_txs_execution_order)| {
let tx_execution_order: Option<&[usize]> = if *randomize_tx_order {
Some(random_txs_execution_order)
} else {
None
};
let results = bank.load_execute_and_commit_transactions(
&e.transactions,
tx_execution_order,
locked_accounts,
MAX_RECENT_BLOCKHASHES,
);
let mut first_err = None;
for (r, tx) in results.iter().zip(e.transactions.iter()) {
if let Err(ref e) = r {
if first_err.is_none() {
first_err = Some(r.clone());
}
if !Bank::can_commit(&r) {
warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx);
datapoint_error!(
"validator_process_entry_error",
("error", format!("error: {:?}, tx: {:?}", e, tx), String)
);
}
.map(|(e, locked_accounts)| {
let results = bank.load_execute_and_commit_transactions(
&e.transactions,
locked_accounts,
MAX_RECENT_BLOCKHASHES,
);
let mut first_err = None;
for (r, tx) in results.iter().zip(e.transactions.iter()) {
if let Err(ref e) = r {
if first_err.is_none() {
first_err = Some(r.clone());
}
if !Bank::can_commit(&r) {
warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx);
datapoint_error!(
"validator_process_entry_error",
("error", format!("error: {:?}, tx: {:?}", e, tx), String)
);
}
}
first_err.unwrap_or(Ok(()))
},
)
}
first_err.unwrap_or(Ok(()))
})
.collect()
})
});
@ -106,16 +95,11 @@ pub fn process_entries(
}
// else loop on processing the entry
loop {
// random_txs_execution_order need to be seperately defined apart from txs_execution_order,
// to satisfy borrow checker.
let mut random_txs_execution_order: Vec<usize> = vec![];
if randomize_tx_execution_order {
random_txs_execution_order = (0..entry.transactions.len()).collect();
let txs_execution_order = if randomize_tx_execution_order {
let mut random_txs_execution_order: Vec<usize> =
(0..entry.transactions.len()).collect();
random_txs_execution_order.shuffle(&mut thread_rng());
}
let txs_execution_order: Option<&[usize]> = if randomize_tx_execution_order {
Some(&random_txs_execution_order)
Some(random_txs_execution_order)
} else {
None
};
@ -127,13 +111,7 @@ pub fn process_entries(
// if locking worked
if first_lock_err.is_ok() {
// push the entry to the mt_group
mt_group.push((
entry,
lock_results,
randomize_tx_execution_order,
random_txs_execution_order,
));
mt_group.push((entry, lock_results));
// done with this entry
break;
}

View File

@ -382,14 +382,14 @@ impl Accounts {
.map_or(false, |lock| *lock.lock_count.lock().unwrap() > 0)
{
error_counters.account_in_use += 1;
debug!("Account in use: {:?}", k);
debug!("CD Account in use: {:?}", k);
return Err(TransactionError::AccountInUse);
}
}
for k in credit_only_keys.iter() {
if locks.contains(k) {
error_counters.account_in_use += 1;
debug!("Account in use: {:?}", k);
debug!("CO Account in use: {:?}", k);
return Err(TransactionError::AccountInUse);
}
}
@ -509,13 +509,21 @@ impl Accounts {
}
/// Once accounts are unlocked, new transactions that modify that state can enter the pipeline
pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) {
pub fn unlock_accounts(
&self,
txs: &[Transaction],
txs_iteration_order: Option<&[usize]>,
results: &[Result<()>],
) {
let mut account_locks = self.account_locks.lock().unwrap();
let credit_only_locks = self.credit_only_account_locks.clone();
debug!("bank unlock accounts");
txs.iter().zip(results.iter()).for_each(|(tx, result)| {
Self::unlock_account(tx, result, &mut account_locks, &credit_only_locks)
});
OrderedIterator::new(txs, txs_iteration_order)
.zip(results.iter())
.for_each(|(tx, result)| {
Self::unlock_account(tx, result, &mut account_locks, &credit_only_locks)
});
}
pub fn has_accounts(&self, fork: Fork) -> bool {
@ -1324,8 +1332,8 @@ mod tests {
2
);
accounts.unlock_accounts(&[tx], &results0);
accounts.unlock_accounts(&txs, &results1);
accounts.unlock_accounts(&[tx], None, &results0);
accounts.unlock_accounts(&txs, None, &results1);
let instructions = vec![CompiledInstruction::new(2, &(), vec![0, 1])];
let message = Message::new_with_compiled_instructions(
@ -1405,7 +1413,7 @@ mod tests {
counter_clone.clone().fetch_add(1, Ordering::SeqCst);
}
}
accounts_clone.unlock_accounts(&txs, &results);
accounts_clone.unlock_accounts(&txs, None, &results);
if exit_clone.clone().load(Ordering::Relaxed) {
break;
}
@ -1420,7 +1428,7 @@ mod tests {
thread::sleep(time::Duration::from_millis(50));
assert_eq!(counter_value, counter_clone.clone().load(Ordering::SeqCst));
}
accounts_arc.unlock_accounts(&txs, &results);
accounts_arc.unlock_accounts(&txs, None, &results);
thread::sleep(time::Duration::from_millis(50));
}
exit.store(true, Ordering::Relaxed);

View File

@ -768,15 +768,18 @@ impl Bank {
pub fn lock_accounts<'a, 'b>(
&'a self,
txs: &'b [Transaction],
txs_iteration_order: Option<&[usize]>,
txs_iteration_order: Option<Vec<usize>>,
) -> LockedAccountsResults<'a, 'b> {
if self.is_frozen() {
warn!("=========== FIXME: lock_accounts() working on a frozen bank! ================");
}
// TODO: put this assert back in
// assert!(!self.is_frozen());
let results = self.rc.accounts.lock_accounts(txs, txs_iteration_order);
LockedAccountsResults::new(results, &self, txs)
let results = self
.rc
.accounts
.lock_accounts(txs, txs_iteration_order.as_ref().map(|v| v.as_slice()));
LockedAccountsResults::new(results, &self, txs, txs_iteration_order)
}
pub fn unlock_accounts(&self, locked_accounts_results: &mut LockedAccountsResults) {
@ -784,6 +787,7 @@ impl Bank {
locked_accounts_results.needs_unlock = false;
self.rc.accounts.unlock_accounts(
locked_accounts_results.transactions(),
locked_accounts_results.txs_iteration_order(),
locked_accounts_results.locked_accounts_results(),
)
}
@ -956,7 +960,6 @@ impl Bank {
pub fn load_and_execute_transactions(
&self,
txs: &[Transaction],
txs_iteration_order: Option<&[usize]>,
lock_results: &LockedAccountsResults,
max_age: usize,
) -> (
@ -971,32 +974,41 @@ impl Bank {
let mut error_counters = ErrorCounters::default();
let mut load_time = Measure::start("accounts_load");
let retryable_txs: Vec<_> =
OrderedIterator::new(lock_results.locked_accounts_results(), txs_iteration_order)
.enumerate()
.filter_map(|(index, res)| match res {
Err(TransactionError::AccountInUse) => Some(index),
Ok(_) => None,
Err(_) => None,
})
.collect();
let retryable_txs: Vec<_> = OrderedIterator::new(
lock_results.locked_accounts_results(),
lock_results.txs_iteration_order(),
)
.enumerate()
.filter_map(|(index, res)| match res {
Err(TransactionError::AccountInUse) => Some(index),
Ok(_) => None,
Err(_) => None,
})
.collect();
let sig_results = self.check_transactions(
txs,
txs_iteration_order,
lock_results.txs_iteration_order(),
lock_results.locked_accounts_results(),
max_age,
&mut error_counters,
);
let mut loaded_accounts =
self.load_accounts(txs, txs_iteration_order, sig_results, &mut error_counters);
let mut loaded_accounts = self.load_accounts(
txs,
lock_results.txs_iteration_order(),
sig_results,
&mut error_counters,
);
load_time.stop();
let mut execution_time = Measure::start("execution_time");
let mut signature_count = 0;
let executed: Vec<Result<()>> = loaded_accounts
.iter_mut()
.zip(OrderedIterator::new(txs, txs_iteration_order))
.zip(OrderedIterator::new(
txs,
lock_results.txs_iteration_order(),
))
.map(|(accs, tx)| match accs {
Err(e) => Err(e.clone()),
Ok((ref mut accounts, ref mut loaders, ref mut credits, ref mut _rents)) => {
@ -1134,16 +1146,15 @@ impl Bank {
pub fn load_execute_and_commit_transactions(
&self,
txs: &[Transaction],
txs_iteration_order: Option<&[usize]>,
lock_results: &LockedAccountsResults,
max_age: usize,
) -> Vec<Result<()>> {
let (mut loaded_accounts, executed, _, tx_count, signature_count) =
self.load_and_execute_transactions(txs, txs_iteration_order, lock_results, max_age);
self.load_and_execute_transactions(txs, lock_results, max_age);
self.commit_transactions(
txs,
txs_iteration_order,
lock_results.txs_iteration_order(),
&mut loaded_accounts,
&executed,
tx_count,
@ -1154,7 +1165,7 @@ impl Bank {
#[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let lock_results = self.lock_accounts(txs, None);
self.load_execute_and_commit_transactions(txs, None, &lock_results, MAX_RECENT_BLOCKHASHES)
self.load_execute_and_commit_transactions(txs, &lock_results, MAX_RECENT_BLOCKHASHES)
}
/// Create, sign, and process a Transaction from `keypair` to `to` of
@ -2103,7 +2114,6 @@ mod tests {
let lock_result = bank.lock_accounts(&pay_alice, None);
let results_alice = bank.load_execute_and_commit_transactions(
&pay_alice,
None,
&lock_result,
MAX_RECENT_BLOCKHASHES,
);

View File

@ -6,6 +6,7 @@ pub struct LockedAccountsResults<'a, 'b> {
locked_accounts_results: Vec<Result<()>>,
bank: &'a Bank,
transactions: &'b [Transaction],
txs_iteration_order: Option<Vec<usize>>,
pub(crate) needs_unlock: bool,
}
@ -14,11 +15,13 @@ impl<'a, 'b> LockedAccountsResults<'a, 'b> {
locked_accounts_results: Vec<Result<()>>,
bank: &'a Bank,
transactions: &'b [Transaction],
txs_iteration_order: Option<Vec<usize>>,
) -> Self {
Self {
locked_accounts_results,
bank,
transactions,
txs_iteration_order,
needs_unlock: true,
}
}
@ -30,6 +33,10 @@ impl<'a, 'b> LockedAccountsResults<'a, 'b> {
pub fn transactions(&self) -> &[Transaction] {
self.transactions
}
pub fn txs_iteration_order(&self) -> Option<&[usize]> {
self.txs_iteration_order.as_ref().map(|v| v.as_slice())
}
}
// Unlock all locked accounts in destructor.