diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 54e4d1696e..9ec955cddc 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,14 +10,18 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_core::banking_stage::{create_test_recorder, BankingStage}; use solana_core::blocktree::{get_tmp_ledger_path, Blocktree}; +use solana_core::blocktree_processor::process_entries; use solana_core::cluster_info::ClusterInfo; use solana_core::cluster_info::Node; +use solana_core::entry::next_hash; +use solana_core::entry::Entry; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::packet::to_packets_chunked; use solana_core::poh_recorder::WorkingBankEntries; use solana_core::service::Service; use solana_core::test_tx::test_tx; use solana_runtime::bank::Bank; +use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; @@ -264,3 +268,83 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { bench_banking(bencher, TransactionType::Programs); } + +fn simulate_process_entries( + randomize_txs: bool, + mint_keypair: &Keypair, + mut tx_vector: Vec, + genesis_block: &GenesisBlock, + keypairs: &Vec, + initial_lamports: u64, + num_accounts: usize, +) { + let bank = Bank::new(genesis_block); + + for i in 0..(num_accounts / 2) { + bank.transfer(initial_lamports, mint_keypair, &keypairs[i * 2].pubkey()) + .unwrap(); + } + + for i in (0..num_accounts).step_by(2) { + tx_vector.push(system_transaction::transfer( + &keypairs[i], + &keypairs[i + 1].pubkey(), + initial_lamports, + bank.last_blockhash(), + )); + } + + // Transfer lamports to each other + let entry = Entry { + num_hashes: 1, + hash: next_hash(&bank.last_blockhash(), 1, &tx_vector), + transactions: tx_vector, + }; + process_entries(&bank, &vec![entry], randomize_txs).unwrap(); +} + +fn bench_process_entries(randomize_txs: bool, bencher: &mut Bencher) { + // entropy multiplier should be big enough to provide sufficient entropy + // but small enough to not take too much time while executing the test. + let entropy_multiplier: usize = 25; + let initial_lamports = 100; + + // number of accounts need to be in multiple of 4 for correct + // execution of the test. + let num_accounts = entropy_multiplier * 4; + let GenesisBlockInfo { + genesis_block, + mint_keypair, + .. + } = create_genesis_block((num_accounts + 1) as u64 * initial_lamports); + + let mut keypairs: Vec = vec![]; + let tx_vector: Vec = Vec::with_capacity(num_accounts / 2); + + for _ in 0..num_accounts { + let keypair = Keypair::new(); + keypairs.push(keypair); + } + + bencher.iter(|| { + simulate_process_entries( + randomize_txs, + &mint_keypair, + tx_vector.clone(), + &genesis_block, + &keypairs, + initial_lamports, + num_accounts, + ); + }); +} + +#[bench] +fn bench_process_entries_without_order_shuffeling(bencher: &mut Bencher) { + bench_process_entries(false, bencher); +} + +#[bench] +fn bench_process_entries_with_order_shuffeling(bencher: &mut Bencher) { + bench_process_entries(true, bencher); +} diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 289258a842..5a32c1d69e 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -489,7 +489,7 @@ impl BankingStage { // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. let (mut loaded_accounts, results, mut retryable_txs, tx_count, signature_count) = - bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE); + bank.load_and_execute_transactions(txs, None, lock_results, MAX_PROCESSING_AGE); load_execute_time.stop(); let freeze_lock = bank.freeze_lock(); @@ -510,6 +510,7 @@ impl BankingStage { if num_to_commit != 0 { bank.commit_transactions( txs, + None, &mut loaded_accounts, &results, tx_count, @@ -541,7 +542,7 @@ impl BankingStage { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state - let lock_results = bank.lock_accounts(txs); + let lock_results = bank.lock_accounts(txs, None); lock_time.stop(); let (result, mut retryable_txs) = @@ -696,6 +697,7 @@ impl BankingStage { // Drop the transaction if it will expire by the time the next node receives and processes it let result = bank.check_transactions( transactions, + None, &filter, (MAX_PROCESSING_AGE) .saturating_sub(MAX_TRANSACTION_FORWARDING_DELAY) diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index ae37f6cc96..e1aa6f4bf0 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -15,6 +15,9 @@ use std::result; use std::sync::Arc; use std::time::{Duration, Instant}; +use rand::seq::SliceRandom; +use rand::thread_rng; + pub const NUM_THREADS: u32 = 10; use std::cell::RefCell; @@ -32,35 +35,46 @@ fn first_err(results: &[Result<()>]) -> Result<()> { Ok(()) } -fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) -> Result<()> { +fn par_execute_entries( + bank: &Bank, + entries: &[(&Entry, LockedAccountsResults, bool, Vec)], +) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", entries.len()); let results: Vec> = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { entries .into_par_iter() - .map(|(e, locked_accounts)| { - let results = bank.load_execute_and_commit_transactions( - &e.transactions, - locked_accounts, - MAX_RECENT_BLOCKHASHES, - ); - let mut first_err = None; - for (r, tx) in results.iter().zip(e.transactions.iter()) { - if let Err(ref e) = r { - if first_err.is_none() { - first_err = Some(r.clone()); - } - if !Bank::can_commit(&r) { - warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx); - datapoint_error!( - "validator_process_entry_error", - ("error", format!("error: {:?}, tx: {:?}", e, tx), String) - ); + .map( + |(e, locked_accounts, randomize_tx_order, random_txs_execution_order)| { + let tx_execution_order: Option<&[usize]> = if *randomize_tx_order { + Some(random_txs_execution_order) + } else { + None + }; + let results = bank.load_execute_and_commit_transactions( + &e.transactions, + tx_execution_order, + locked_accounts, + MAX_RECENT_BLOCKHASHES, + ); + let mut first_err = None; + for (r, tx) in results.iter().zip(e.transactions.iter()) { + if let Err(ref e) = r { + if first_err.is_none() { + first_err = Some(r.clone()); + } + if !Bank::can_commit(&r) { + warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx); + datapoint_error!( + "validator_process_entry_error", + ("error", format!("error: {:?}, tx: {:?}", e, tx), String) + ); + } } } - } - first_err.unwrap_or(Ok(())) - }) + first_err.unwrap_or(Ok(())) + }, + ) .collect() }) }); @@ -73,7 +87,11 @@ fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) /// 2. Process the locked group in parallel /// 3. Register the `Tick` if it's available /// 4. Update the leader scheduler, goto 1 -pub fn process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> { +pub fn process_entries( + bank: &Bank, + entries: &[Entry], + randomize_tx_execution_order: bool, +) -> Result<()> { // accumulator for entries that can be processed in parallel let mut mt_group = vec![]; for entry in entries { @@ -86,15 +104,34 @@ pub fn process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> { } // else loop on processing the entry loop { + // random_txs_execution_order need to be seperately defined apart from txs_execution_order, + // to satisfy borrow checker. + let mut random_txs_execution_order: Vec = vec![]; + if randomize_tx_execution_order { + random_txs_execution_order = (0..entry.transactions.len()).collect(); + random_txs_execution_order.shuffle(&mut thread_rng()); + } + + let txs_execution_order: Option<&[usize]> = if randomize_tx_execution_order { + Some(&random_txs_execution_order) + } else { + None + }; + // try to lock the accounts - let lock_results = bank.lock_accounts(&entry.transactions); + let lock_results = bank.lock_accounts(&entry.transactions, txs_execution_order); let first_lock_err = first_err(lock_results.locked_accounts_results()); // if locking worked if first_lock_err.is_ok() { // push the entry to the mt_group - mt_group.push((entry, lock_results)); + mt_group.push(( + entry, + lock_results, + randomize_tx_execution_order, + random_txs_execution_order, + )); // done with this entry break; } @@ -225,7 +262,7 @@ fn verify_and_process_entries( return Err(BlocktreeProcessorError::LedgerVerificationFailed); } - process_entries(&bank, &entries).map_err(|err| { + process_entries(&bank, &entries, true).map_err(|err| { warn!( "Failed to process entries for slot {}: {:?}", bank.slot(), @@ -417,6 +454,7 @@ pub mod tests { use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; + use solana_sdk::transaction::Transaction; use solana_sdk::transaction::TransactionError; pub fn fill_blocktree_slot_with_ticks( @@ -771,7 +809,7 @@ pub mod tests { ); // Now ensure the TX is accepted despite pointing to the ID of an empty entry. - process_entries(&bank, &slot_entries).unwrap(); + process_entries(&bank, &slot_entries, true).unwrap(); assert_eq!(bank.process_transaction(&tx), Ok(())); } @@ -868,7 +906,7 @@ pub mod tests { // ensure bank can process a tick assert_eq!(bank.tick_height(), 0); let tick = next_entry(&genesis_block.hash(), 1, vec![]); - assert_eq!(process_entries(&bank, &[tick.clone()]), Ok(())); + assert_eq!(process_entries(&bank, &[tick.clone()], true), Ok(())); assert_eq!(bank.tick_height(), 1); } @@ -900,7 +938,7 @@ pub mod tests { bank.last_blockhash(), ); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); - assert_eq!(process_entries(&bank, &[entry_1, entry_2]), Ok(())); + assert_eq!(process_entries(&bank, &[entry_1, entry_2], true), Ok(())); assert_eq!(bank.get_balance(&keypair1.pubkey()), 2); assert_eq!(bank.get_balance(&keypair2.pubkey()), 2); assert_eq!(bank.last_blockhash(), blockhash); @@ -954,7 +992,7 @@ pub mod tests { ); assert_eq!( - process_entries(&bank, &[entry_1_to_mint, entry_2_to_3_mint_to_1]), + process_entries(&bank, &[entry_1_to_mint, entry_2_to_3_mint_to_1], false), Ok(()) ); @@ -1022,7 +1060,8 @@ pub mod tests { assert!(process_entries( &bank, - &[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()] + &[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], + false ) .is_err()); @@ -1033,13 +1072,13 @@ pub mod tests { // Check all accounts are unlocked let txs1 = &entry_1_to_mint.transactions[..]; let txs2 = &entry_2_to_3_mint_to_1.transactions[..]; - let locked_accounts1 = bank.lock_accounts(txs1); + let locked_accounts1 = bank.lock_accounts(txs1, None); for result in locked_accounts1.locked_accounts_results() { assert!(result.is_ok()); } // txs1 and txs2 have accounts that conflict, so we must drop txs1 first drop(locked_accounts1); - let locked_accounts2 = bank.lock_accounts(txs2); + let locked_accounts2 = bank.lock_accounts(txs2, None); for result in locked_accounts2.locked_accounts_results() { assert!(result.is_ok()); } @@ -1131,7 +1170,8 @@ pub mod tests { entry_1_to_mint.clone(), entry_2_to_3_and_1_to_mint.clone(), entry_conflict_itself.clone() - ] + ], + false ) .is_err()); @@ -1186,12 +1226,88 @@ pub mod tests { bank.last_blockhash(), ); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); - assert_eq!(process_entries(&bank, &[entry_1, entry_2]), Ok(())); + assert_eq!(process_entries(&bank, &[entry_1, entry_2], true), Ok(())); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); assert_eq!(bank.get_balance(&keypair4.pubkey()), 1); assert_eq!(bank.last_blockhash(), blockhash); } + #[test] + fn test_process_entry_tx_random_execution_no_error() { + // entropy multiplier should be big enough to provide sufficient entropy + // but small enough to not take too much time while executing the test. + let entropy_multiplier: usize = 25; + let initial_lamports = 100; + + // number of accounts need to be in multiple of 4 for correct + // execution of the test. + let num_accounts = entropy_multiplier * 4; + let GenesisBlockInfo { + genesis_block, + mint_keypair, + .. + } = create_genesis_block((num_accounts + 1) as u64 * initial_lamports); + + let bank = Bank::new(&genesis_block); + + let mut keypairs: Vec = vec![]; + + for _ in 0..num_accounts { + let keypair = Keypair::new(); + let create_account_tx = system_transaction::create_user_account( + &mint_keypair, + &keypair.pubkey(), + 0, + bank.last_blockhash(), + ); + assert_eq!(bank.process_transaction(&create_account_tx), Ok(())); + assert_matches!( + bank.transfer(initial_lamports, &mint_keypair, &keypair.pubkey()), + Ok(_) + ); + keypairs.push(keypair); + } + + let mut tx_vector: Vec = vec![]; + + for i in (0..num_accounts).step_by(4) { + tx_vector.append(&mut vec![ + system_transaction::transfer( + &keypairs[i + 1], + &keypairs[i].pubkey(), + initial_lamports, + bank.last_blockhash(), + ), + system_transaction::transfer( + &keypairs[i + 3], + &keypairs[i + 2].pubkey(), + initial_lamports, + bank.last_blockhash(), + ), + ]); + } + + // Transfer lamports to each other + let entry = next_entry(&bank.last_blockhash(), 1, tx_vector); + assert_eq!(process_entries(&bank, &vec![entry], true), Ok(())); + bank.squash(); + + // Even number keypair should have balance of 2 * initial_lamports and + // odd number keypair should have balance of 0, which proves + // that even in case of random order of execution, overall state remains + // consistent. + for i in 0..num_accounts { + if i % 2 == 0 { + assert_eq!( + bank.get_balance(&keypairs[i].pubkey()), + 2 * initial_lamports + ); + } else { + assert_eq!(bank.get_balance(&keypairs[i].pubkey()), 0); + } + } + } + #[test] fn test_process_entries_2_entries_tick() { let GenesisBlockInfo { @@ -1239,7 +1355,11 @@ pub mod tests { ); let entry_2 = next_entry(&tick.hash, 1, vec![tx]); assert_eq!( - process_entries(&bank, &[entry_1.clone(), tick.clone(), entry_2.clone()]), + process_entries( + &bank, + &[entry_1.clone(), tick.clone(), entry_2.clone()], + true + ), Ok(()) ); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); @@ -1254,7 +1374,7 @@ pub mod tests { ); let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]); assert_eq!( - process_entries(&bank, &[entry_3]), + process_entries(&bank, &[entry_3], true), Err(TransactionError::AccountNotFound) ); } @@ -1335,7 +1455,7 @@ pub mod tests { ); assert_eq!( - process_entries(&bank, &[entry_1_to_mint]), + process_entries(&bank, &[entry_1_to_mint], false), Err(TransactionError::AccountInUse) ); @@ -1461,7 +1581,7 @@ pub mod tests { }) .collect(); info!("paying iteration {}", i); - process_entries(&bank, &entries).expect("paying failed"); + process_entries(&bank, &entries, true).expect("paying failed"); let entries: Vec<_> = (0..NUM_TRANSFERS) .map(|i| { @@ -1479,7 +1599,7 @@ pub mod tests { .collect(); info!("refunding iteration {}", i); - process_entries(&bank, &entries).expect("refunding failed"); + process_entries(&bank, &entries, true).expect("refunding failed"); // advance to next block process_entries( @@ -1487,6 +1607,7 @@ pub mod tests { &(0..bank.ticks_per_slot()) .map(|_| next_entry_mut(&mut hash, 1, vec![])) .collect::>(), + true, ) .expect("process ticks failed"); diff --git a/core/src/entry.rs b/core/src/entry.rs index 2ab139bb81..e156bf4504 100644 --- a/core/src/entry.rs +++ b/core/src/entry.rs @@ -189,7 +189,7 @@ pub fn hash_transactions(transactions: &[Transaction]) -> Hash { /// a signature, the final hash will be a hash of both the previous ID and /// the signature. If num_hashes is zero and there's no transaction data, /// start_hash is returned. -fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash { +pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash { if num_hashes == 0 && transactions.is_empty() { return *start_hash; } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 10b3ee9ec2..c03947f76c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -726,7 +726,7 @@ impl ReplayStage { ); return Err(Error::BlobError(BlobError::VerificationFailed)); } - blocktree_processor::process_entries(bank, entries)?; + blocktree_processor::process_entries(bank, entries, true)?; Ok(()) } diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 8bd7d6407c..26e72f821e 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -743,6 +743,7 @@ mod tests { blocktree_processor::process_entries( &bank, &entry::create_ticks(64, bank.last_blockhash()), + true, ) .expect("failed process entries"); last_bank = Arc::new(bank); @@ -863,6 +864,7 @@ mod tests { DEFAULT_TICKS_PER_SLOT * next_bank.slots_per_segment() + 1, bank.last_blockhash(), ), + true, ) .unwrap(); let message = Message::new_with_payer(vec![mining_proof_ix], Some(&mint_keypair.pubkey())); diff --git a/runtime/benches/transaction_utils.rs b/runtime/benches/transaction_utils.rs new file mode 100644 index 0000000000..000b06d951 --- /dev/null +++ b/runtime/benches/transaction_utils.rs @@ -0,0 +1,19 @@ +#![feature(test)] + +extern crate test; + +use rand::seq::SliceRandom; +use rand::thread_rng; +use solana_runtime::transaction_utils::OrderedIterator; +use test::Bencher; + +#[bench] +fn bench_ordered_iterator_with_order_shuffling(bencher: &mut Bencher) { + let vec: Vec = (0..100_usize).collect(); + bencher.iter(|| { + let mut order: Vec = (0..100_usize).collect(); + order.shuffle(&mut thread_rng()); + let _ordered_iterator_resp: Vec<&usize> = + OrderedIterator::new(&vec, Some(&order)).collect(); + }); +} diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index a8f4dee3a1..66a5f5e9e3 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -23,6 +23,8 @@ use std::path::Path; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use crate::transaction_utils::OrderedIterator; + #[derive(Default, Debug)] struct CreditOnlyLock { credits: AtomicU64, @@ -210,6 +212,7 @@ impl Accounts { &self, ancestors: &HashMap, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, lock_results: Vec>, hash_queue: &BlockhashQueue, error_counters: &mut ErrorCounters, @@ -226,7 +229,7 @@ impl Accounts { //TODO: two locks usually leads to deadlocks, should this be one structure? let accounts_index = self.accounts_db.accounts_index.read().unwrap(); let storage = self.accounts_db.storage.read().unwrap(); - txs.iter() + OrderedIterator::new(txs, txs_iteration_order) .zip(lock_results.into_iter()) .map(|etx| match etx { (tx, Ok(())) => { @@ -477,10 +480,13 @@ impl Accounts { /// This function will prevent multiple threads from modifying the same account state at the /// same time #[must_use] - pub fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { + pub fn lock_accounts( + &self, + txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, + ) -> Vec> { let mut error_counters = ErrorCounters::default(); - let rv = txs - .iter() + let rv = OrderedIterator::new(txs, txs_iteration_order) .map(|tx| { let message = &tx.message(); Self::lock_account( @@ -521,6 +527,7 @@ impl Accounts { &self, fork: Fork, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, res: &[Result<()>], loaded: &mut [Result<( TransactionAccounts, @@ -529,7 +536,8 @@ impl Accounts { TransactionRents, )>], ) { - let accounts_to_store = self.collect_accounts_to_store(txs, res, loaded); + let accounts_to_store = + self.collect_accounts_to_store(txs, txs_iteration_order, res, loaded); self.accounts_db.store(fork, &accounts_to_store); } @@ -601,6 +609,7 @@ impl Accounts { fn collect_accounts_to_store<'a>( &self, txs: &'a [Transaction], + txs_iteration_order: Option<&'a [usize]>, res: &'a [Result<()>], loaded: &'a mut [Result<( TransactionAccounts, @@ -610,12 +619,16 @@ impl Accounts { )>], ) -> Vec<(&'a Pubkey, &'a Account)> { let mut accounts = Vec::new(); - for (i, raccs) in loaded.iter_mut().enumerate() { + for (i, (raccs, tx)) in loaded + .iter_mut() + .zip(OrderedIterator::new(txs, txs_iteration_order)) + .enumerate() + { if res[i].is_err() || raccs.is_err() { continue; } - let message = &txs[i].message(); + let message = &tx.message(); let acc = raccs.as_mut().unwrap(); for (((i, key), account), credit) in message .account_keys @@ -700,6 +713,7 @@ mod tests { let res = accounts.load_accounts( &ancestors, &[tx], + None, vec![Ok(())], &hash_queue, error_counters, @@ -1276,7 +1290,7 @@ mod tests { instructions, ); let tx = Transaction::new(&[&keypair0], message, Hash::default()); - let results0 = accounts.lock_accounts(&[tx.clone()]); + let results0 = accounts.lock_accounts(&[tx.clone()], None); assert!(results0[0].is_ok()); assert_eq!( @@ -1315,7 +1329,7 @@ mod tests { ); let tx1 = Transaction::new(&[&keypair1], message, Hash::default()); let txs = vec![tx0, tx1]; - let results1 = accounts.lock_accounts(&txs); + let results1 = accounts.lock_accounts(&txs, None); assert!(results1[0].is_ok()); // Credit-only account (keypair1) can be referenced multiple times assert!(results1[1].is_err()); // Credit-only account (keypair1) cannot also be locked as credit-debit @@ -1347,7 +1361,7 @@ mod tests { instructions, ); let tx = Transaction::new(&[&keypair1], message, Hash::default()); - let results2 = accounts.lock_accounts(&[tx]); + let results2 = accounts.lock_accounts(&[tx], None); assert!(results2[0].is_ok()); // Now keypair1 account can be locked as credit-debit @@ -1409,7 +1423,7 @@ mod tests { let exit_clone = exit_clone.clone(); loop { let txs = vec![credit_debit_tx.clone()]; - let results = accounts_clone.clone().lock_accounts(&txs); + let results = accounts_clone.clone().lock_accounts(&txs, None); for result in results.iter() { if result.is_ok() { counter_clone.clone().fetch_add(1, Ordering::SeqCst); @@ -1424,7 +1438,7 @@ mod tests { let counter_clone = counter.clone(); for _ in 0..5 { let txs = vec![credit_only_tx.clone()]; - let results = accounts_arc.clone().lock_accounts(&txs); + let results = accounts_arc.clone().lock_accounts(&txs, None); if results[0].is_ok() { let counter_value = counter_clone.clone().load(Ordering::SeqCst); thread::sleep(time::Duration::from_millis(50)); @@ -1577,7 +1591,8 @@ mod tests { }, ); } - let collected_accounts = accounts.collect_accounts_to_store(&txs, &loaders, &mut loaded); + let collected_accounts = + accounts.collect_accounts_to_store(&txs, None, &loaders, &mut loaded); assert_eq!(collected_accounts.len(), 2); assert!(collected_accounts .iter() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d7a574992e..d47934a20d 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -2,6 +2,7 @@ //! programs. It offers a high-level API that signs transactions //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. +use crate::transaction_utils::OrderedIterator; use crate::{ accounts::{ Accounts, TransactionAccounts, TransactionCredits, TransactionLoaders, TransactionRents, @@ -674,9 +675,14 @@ impl Bank { } } - fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { + fn update_transaction_statuses( + &self, + txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, + res: &[Result<()>], + ) { let mut status_cache = self.src.status_cache.write().unwrap(); - for (i, tx) in txs.iter().enumerate() { + for (i, tx) in OrderedIterator::new(txs, txs_iteration_order).enumerate() { if Self::can_commit(&res[i]) && !tx.signatures.is_empty() { status_cache.insert( &tx.message().recent_blockhash, @@ -763,13 +769,14 @@ impl Bank { pub fn lock_accounts<'a, 'b>( &'a self, txs: &'b [Transaction], + txs_iteration_order: Option<&[usize]>, ) -> LockedAccountsResults<'a, 'b> { if self.is_frozen() { warn!("=========== FIXME: lock_accounts() working on a frozen bank! ================"); } // TODO: put this assert back in // assert!(!self.is_frozen()); - let results = self.rc.accounts.lock_accounts(txs); + let results = self.rc.accounts.lock_accounts(txs, txs_iteration_order); LockedAccountsResults::new(results, &self, txs) } @@ -786,6 +793,7 @@ impl Bank { fn load_accounts( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec< @@ -799,6 +807,7 @@ impl Bank { self.rc.accounts.load_accounts( &self.ancestors, txs, + txs_iteration_order, results, &self.blockhash_queue.read().unwrap(), error_counters, @@ -808,10 +817,11 @@ impl Bank { fn check_refs( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, lock_results: &[Result<()>], error_counters: &mut ErrorCounters, ) -> Vec> { - txs.iter() + OrderedIterator::new(txs, txs_iteration_order) .zip(lock_results) .map(|(tx, lock_res)| { if lock_res.is_ok() && !tx.verify_refs() { @@ -826,12 +836,13 @@ impl Bank { fn check_age( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, lock_results: Vec>, max_age: usize, error_counters: &mut ErrorCounters, ) -> Vec> { let hash_queue = self.blockhash_queue.read().unwrap(); - txs.iter() + OrderedIterator::new(txs, txs_iteration_order) .zip(lock_results.into_iter()) .map(|(tx, lock_res)| { if lock_res.is_ok() @@ -848,11 +859,12 @@ impl Bank { fn check_signatures( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, lock_results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { let rcache = self.src.status_cache.read().unwrap(); - txs.iter() + OrderedIterator::new(txs, txs_iteration_order) .zip(lock_results.into_iter()) .map(|(tx, lock_res)| { if tx.signatures.is_empty() { @@ -886,13 +898,21 @@ impl Bank { pub fn check_transactions( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, lock_results: &[Result<()>], max_age: usize, mut error_counters: &mut ErrorCounters, ) -> Vec> { - let refs_results = self.check_refs(txs, lock_results, &mut error_counters); - let age_results = self.check_age(txs, refs_results, max_age, &mut error_counters); - self.check_signatures(txs, age_results, &mut error_counters) + let refs_results = + self.check_refs(txs, txs_iteration_order, lock_results, &mut error_counters); + let age_results = self.check_age( + txs, + txs_iteration_order, + refs_results, + max_age, + &mut error_counters, + ); + self.check_signatures(txs, txs_iteration_order, age_results, &mut error_counters) } fn update_error_counters(error_counters: &ErrorCounters) { @@ -944,6 +964,7 @@ impl Bank { pub fn load_and_execute_transactions( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, lock_results: &LockedAccountsResults, max_age: usize, ) -> ( @@ -965,31 +986,32 @@ impl Bank { let mut error_counters = ErrorCounters::default(); let mut load_time = Measure::start("accounts_load"); - let retryable_txs: Vec<_> = lock_results - .locked_accounts_results() - .iter() - .enumerate() - .filter_map(|(index, res)| match res { - Err(TransactionError::AccountInUse) => Some(index), - Ok(_) => None, - Err(_) => None, - }) - .collect(); + let retryable_txs: Vec<_> = + OrderedIterator::new(lock_results.locked_accounts_results(), txs_iteration_order) + .enumerate() + .filter_map(|(index, res)| match res { + Err(TransactionError::AccountInUse) => Some(index), + Ok(_) => None, + Err(_) => None, + }) + .collect(); let sig_results = self.check_transactions( txs, + txs_iteration_order, lock_results.locked_accounts_results(), max_age, &mut error_counters, ); - let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); + let mut loaded_accounts = + self.load_accounts(txs, txs_iteration_order, sig_results, &mut error_counters); load_time.stop(); let mut execution_time = Measure::start("execution_time"); let mut signature_count = 0; let executed: Vec> = loaded_accounts .iter_mut() - .zip(txs.iter()) + .zip(OrderedIterator::new(txs, txs_iteration_order)) .map(|(accs, tx)| match accs { Err(e) => Err(e.clone()), Ok((ref mut accounts, ref mut loaders, ref mut credits, ref mut _rents)) => { @@ -1042,12 +1064,12 @@ impl Bank { fn filter_program_errors_and_collect_fee( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, executed: &[Result<()>], ) -> Vec> { let hash_queue = self.blockhash_queue.read().unwrap(); let mut fees = 0; - let results = txs - .iter() + let results = OrderedIterator::new(txs, txs_iteration_order) .zip(executed.iter()) .map(|(tx, res)| { let fee_calculator = hash_queue @@ -1082,6 +1104,7 @@ impl Bank { pub fn commit_transactions( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, loaded_accounts: &mut [Result<( TransactionAccounts, TransactionLoaders, @@ -1109,17 +1132,21 @@ impl Bank { // TODO: put this assert back in // assert!(!self.is_frozen()); let mut write_time = Measure::start("write_time"); - self.rc - .accounts - .store_accounts(self.slot(), txs, executed, loaded_accounts); + self.rc.accounts.store_accounts( + self.slot(), + txs, + txs_iteration_order, + executed, + loaded_accounts, + ); - self.update_cached_accounts(txs, executed, loaded_accounts); + self.update_cached_accounts(txs, txs_iteration_order, executed, loaded_accounts); // once committed there is no way to unroll write_time.stop(); debug!("store: {}us txs_len={}", write_time.as_us(), txs.len(),); - self.update_transaction_statuses(txs, &executed); - self.filter_program_errors_and_collect_fee(txs, executed) + self.update_transaction_statuses(txs, txs_iteration_order, &executed); + self.filter_program_errors_and_collect_fee(txs, txs_iteration_order, executed) } /// Process a batch of transactions. @@ -1127,14 +1154,16 @@ impl Bank { pub fn load_execute_and_commit_transactions( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, lock_results: &LockedAccountsResults, max_age: usize, ) -> Vec> { let (mut loaded_accounts, executed, _, tx_count, signature_count) = - self.load_and_execute_transactions(txs, lock_results, max_age); + self.load_and_execute_transactions(txs, txs_iteration_order, lock_results, max_age); self.commit_transactions( txs, + txs_iteration_order, &mut loaded_accounts, &executed, tx_count, @@ -1144,8 +1173,8 @@ impl Bank { #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { - let lock_results = self.lock_accounts(txs); - self.load_execute_and_commit_transactions(txs, &lock_results, MAX_RECENT_BLOCKHASHES) + let lock_results = self.lock_accounts(txs, None); + self.load_execute_and_commit_transactions(txs, None, &lock_results, MAX_RECENT_BLOCKHASHES) } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -1359,6 +1388,7 @@ impl Bank { fn update_cached_accounts( &self, txs: &[Transaction], + txs_iteration_order: Option<&[usize]>, res: &[Result<()>], loaded: &[Result<( TransactionAccounts, @@ -1367,12 +1397,16 @@ impl Bank { TransactionRents, )>], ) { - for (i, raccs) in loaded.iter().enumerate() { + for (i, (raccs, tx)) in loaded + .iter() + .zip(OrderedIterator::new(txs, txs_iteration_order)) + .enumerate() + { if res[i].is_err() || raccs.is_err() { continue; } - let message = &txs[i].message(); + let message = &tx.message(); let acc = raccs.as_ref().unwrap(); for (pubkey, account) in @@ -1986,7 +2020,7 @@ mod tests { ]; let initial_balance = bank.get_balance(&leader); - let results = bank.filter_program_errors_and_collect_fee(&vec![tx1, tx2], &results); + let results = bank.filter_program_errors_and_collect_fee(&vec![tx1, tx2], None, &results); bank.freeze(); assert_eq!( bank.get_balance(&leader), @@ -2091,9 +2125,10 @@ mod tests { ); let pay_alice = vec![tx1]; - let lock_result = bank.lock_accounts(&pay_alice); + let lock_result = bank.lock_accounts(&pay_alice, None); let results_alice = bank.load_execute_and_commit_transactions( &pay_alice, + None, &lock_result, MAX_RECENT_BLOCKHASHES, ); @@ -2140,7 +2175,7 @@ mod tests { let tx = Transaction::new(&[&key0], message, genesis_block.hash()); let txs = vec![tx]; - let lock_result0 = bank.lock_accounts(&txs); + let lock_result0 = bank.lock_accounts(&txs, None); assert!(lock_result0.locked_accounts_results()[0].is_ok()); // Try locking accounts, locking a previously credit-only account as credit-debit @@ -2158,7 +2193,7 @@ mod tests { let tx = Transaction::new(&[&key1], message, genesis_block.hash()); let txs = vec![tx]; - let lock_result1 = bank.lock_accounts(&txs); + let lock_result1 = bank.lock_accounts(&txs, None); assert!(lock_result1.locked_accounts_results()[0].is_err()); // Try locking a previously credit-only account a 2nd time; should succeed @@ -2175,7 +2210,7 @@ mod tests { let tx = Transaction::new(&[&key2], message, genesis_block.hash()); let txs = vec![tx]; - let lock_result2 = bank.lock_accounts(&txs); + let lock_result2 = bank.lock_accounts(&txs, None); assert!(lock_result2.locked_accounts_results()[0].is_ok()); } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index b84b0d1610..7c25cf6598 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -18,6 +18,7 @@ pub mod stakes; pub mod status_cache; pub mod storage_utils; mod system_instruction_processor; +pub mod transaction_utils; #[macro_use] extern crate solana_metrics; diff --git a/runtime/src/locked_accounts_results.rs b/runtime/src/locked_accounts_results.rs index 437d249448..ed228fef95 100644 --- a/runtime/src/locked_accounts_results.rs +++ b/runtime/src/locked_accounts_results.rs @@ -54,7 +54,7 @@ mod tests { let (bank, txs) = setup(); // Test getting locked accounts - let lock_results = bank.lock_accounts(&txs); + let lock_results = bank.lock_accounts(&txs, None); // Grab locks assert!(lock_results @@ -63,7 +63,7 @@ mod tests { .all(|x| x.is_ok())); // Trying to grab locks again should fail - let lock_results2 = bank.lock_accounts(&txs); + let lock_results2 = bank.lock_accounts(&txs, None); assert!(lock_results2 .locked_accounts_results() .iter() @@ -73,7 +73,7 @@ mod tests { drop(lock_results); // Now grabbing locks should work again - let lock_results2 = bank.lock_accounts(&txs); + let lock_results2 = bank.lock_accounts(&txs, None); assert!(lock_results2 .locked_accounts_results() .iter() diff --git a/runtime/src/transaction_utils.rs b/runtime/src/transaction_utils.rs new file mode 100644 index 0000000000..8068c6a622 --- /dev/null +++ b/runtime/src/transaction_utils.rs @@ -0,0 +1,73 @@ +use std::ops::Index; + +/// OrderedIterator allows iterating with specific order specified +pub struct OrderedIterator<'a, T: 'a> { + element_order: Option<&'a [usize]>, + current: usize, + vec: &'a [T], +} + +impl<'a, T> OrderedIterator<'a, T> { + pub fn new(vec: &'a [T], element_order: Option<&'a [usize]>) -> OrderedIterator<'a, T> { + if let Some(custom_order) = element_order { + assert!(custom_order.len() == vec.len()); + } + OrderedIterator { + element_order, + current: 0, + vec, + } + } +} + +impl<'a, T> Iterator for OrderedIterator<'a, T> { + type Item = &'a T; + fn next(&mut self) -> Option { + if self.current >= self.vec.len() { + None + } else { + let index: usize; + if let Some(custom_order) = self.element_order { + index = custom_order[self.current]; + } else { + index = self.current; + } + self.current += 1; + Some(self.vec.index(index)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ordered_iterator_custom_order() { + let vec: Vec = vec![1, 2, 3, 4]; + let custom_order: Vec = vec![3, 1, 0, 2]; + + let ordered_iterator = OrderedIterator::new(&vec, Some(&custom_order)); + let expected_response: Vec = vec![4, 2, 1, 3]; + + let resp: Vec<(&usize, &usize)> = ordered_iterator + .zip(expected_response.iter()) + .filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem) + .collect(); + + assert_eq!(resp.len(), custom_order.len()); + } + + #[test] + fn test_ordered_iterator_original_order() { + let vec: Vec = vec![1, 2, 3, 4]; + let ordered_iterator = OrderedIterator::new(&vec, None); + + let resp: Vec<(&usize, &usize)> = ordered_iterator + .zip(vec.iter()) + .filter(|(actual_elem, expected_elem)| *actual_elem == *expected_elem) + .collect(); + + assert_eq!(resp.len(), vec.len()); + } +}