From e6030d66eb00837b641417fe87953cf0d16440d1 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Mon, 21 Jan 2019 10:17:04 -0800 Subject: [PATCH] split load+execute from commit in bank, insert record between them in TPU code (#2487) * split load+execute from commit in bank, insert record between them in TPU code * clippy * remove clear_signatures() race with commit_transactions() * add #[test] back --- benches/banking_stage.rs | 24 +++++-- src/bank.rs | 152 ++++++++++++++++++++++++++++++++------- 2 files changed, 142 insertions(+), 34 deletions(-) diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 3bbb3873bc..869215f911 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -42,7 +42,8 @@ fn check_txs(receiver: &Receiver>, ref_tx_count: usize) { #[bench] fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let num_threads = BankingStage::num_threads() as usize; - let txes = 1000 * num_threads; + // a multiple of packet chunk 2X duplicates to avoid races + let txes = 192 * 50 * num_threads * 2; let mint_total = 1_000_000_000_000; let mint = Mint::new(mint_total); @@ -115,14 +116,18 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { bank.register_tick(&id); } + let half_len = verified.len() / 2; + let mut start = 0; bencher.iter(move || { - // make sure the tx last id is still registered + // make sure the transactions are still valid bank.register_tick(&mint.last_id()); - for v in verified.chunks(verified.len() / num_threads) { + for v in verified[start..start + half_len].chunks(verified.len() / num_threads) { verified_sender.send(v.to_vec()).unwrap(); } - check_txs(&signal_receiver, txes); + check_txs(&signal_receiver, txes / 2); bank.clear_signatures(); + start += half_len; + start %= verified.len(); }); } @@ -130,7 +135,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { let progs = 4; let num_threads = BankingStage::num_threads() as usize; - let txes = 1000 * num_threads; + // a multiple of packet chunk 2X duplicates to avoid races + let txes = 96 * 100 * num_threads * 2; let mint_total = 1_000_000_000_000; let mint = Mint::new(mint_total); @@ -218,13 +224,17 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { bank.register_tick(&id); } + let half_len = verified.len() / 2; + let mut start = 0; bencher.iter(move || { // make sure the transactions are still valid bank.register_tick(&mint.last_id()); - for v in verified.chunks(verified.len() / num_threads) { + for v in verified[start..start + half_len].chunks(verified.len() / num_threads) { verified_sender.send(v.to_vec()).unwrap(); } - check_txs(&signal_receiver, txes); + check_txs(&signal_receiver, txes / 2); bank.clear_signatures(); + start += half_len; + start %= verified.len(); }); } diff --git a/src/bank.rs b/src/bank.rs index b7f2cd95f5..155522901d 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -404,26 +404,38 @@ impl Bank { // same account state let lock_results = self.lock_accounts(txs); let lock_time = now.elapsed(); + let now = Instant::now(); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. - let results = - self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2); - let process_time = now.elapsed(); - let now = Instant::now(); - self.record_transactions(txs, &results, poh)?; - let record_time = now.elapsed(); + let (loaded_accounts, results) = + self.load_and_execute_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2); + let load_execute_time = now.elapsed(); + + let record_time = { + let now = Instant::now(); + self.record_transactions(txs, &results, poh)?; + now.elapsed() + }; + + let commit_time = { + let now = Instant::now(); + self.commit_transactions(txs, &loaded_accounts, &results); + now.elapsed() + }; + let now = Instant::now(); // Once the accounts are new transactions can enter the pipeline to process them self.unlock_accounts(&txs, &results); let unlock_time = now.elapsed(); debug!( - "lock: {}us process: {}us record: {}us unlock: {}us txs_len={}", + "lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}", duration_as_us(&lock_time), - duration_as_us(&process_time), + duration_as_us(&load_execute_time), duration_as_us(&record_time), + duration_as_us(&commit_time), duration_as_us(&unlock_time), txs.len(), ); @@ -447,10 +459,10 @@ impl Bank { } }) .collect(); + debug!("processed: {} ", processed_transactions.len()); // unlock all the accounts with errors which are filtered by the above `filter_map` if !processed_transactions.is_empty() { let hash = Transaction::hash(&processed_transactions); - debug!("processed ok: {} {}", processed_transactions.len(), hash); // record and unlock will unlock all the successfull transactions poh.record(hash, processed_transactions).map_err(|e| { warn!("record failure: {:?}", e); @@ -472,14 +484,16 @@ impl Bank { .load_accounts(txs, &mut last_ids, lock_results, max_age, error_counters) } - /// Process a batch of transactions. - #[must_use] - pub fn execute_and_commit_transactions( + #[allow(clippy::type_complexity)] + fn load_and_execute_transactions( &self, txs: &[Transaction], lock_results: Vec>, max_age: usize, - ) -> Vec> { + ) -> ( + Vec>, + Vec>, + ) { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); @@ -505,23 +519,13 @@ impl Bank { .collect(); let execution_elapsed = now.elapsed(); - let now = Instant::now(); - self.accounts - .store_accounts(txs, &executed, &loaded_accounts); - // Check account subscriptions and send notifications - self.send_account_notifications(txs, &executed, &loaded_accounts); - - // once committed there is no way to unroll - let write_elapsed = now.elapsed(); debug!( - "load: {}us execute: {}us store: {}us txs_len={}", + "load: {}us execute: {}us txs_len={}", duration_as_us(&load_elapsed), duration_as_us(&execution_elapsed), - duration_as_us(&write_elapsed), txs.len(), ); - self.update_transaction_statuses(txs, &executed); let mut tx_count = 0; let mut err_count = 0; for (r, tx) in executed.iter().zip(txs.iter()) { @@ -570,13 +574,50 @@ impl Bank { error_counters.insufficient_funds ); } + (loaded_accounts, executed) + } + + fn commit_transactions( + &self, + txs: &[Transaction], + loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>], + executed: &[Result<()>], + ) { + let now = Instant::now(); + self.accounts.store_accounts(txs, executed, loaded_accounts); + + // Check account subscriptions and send notifications + self.send_account_notifications(txs, executed, loaded_accounts); + + // once committed there is no way to unroll + let write_elapsed = now.elapsed(); + debug!( + "store: {}us txs_len={}", + duration_as_us(&write_elapsed), + txs.len(), + ); + self.update_transaction_statuses(txs, &executed); + } + + /// Process a batch of transactions. + #[must_use] + pub fn load_execute_and_commit_transactions( + &self, + txs: &[Transaction], + lock_results: Vec>, + max_age: usize, + ) -> Vec> { + let (loaded_accounts, executed) = + self.load_and_execute_transactions(txs, lock_results, max_age); + + self.commit_transactions(txs, &loaded_accounts, &executed); executed } #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { let lock_results = self.lock_accounts(txs); - let results = self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); + let results = self.load_execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); self.unlock_accounts(txs, &results); results } @@ -618,7 +659,7 @@ impl Bank { let results: Vec> = entries .into_par_iter() .map(|(e, lock_results)| { - let results = self.execute_and_commit_transactions( + let results = self.load_execute_and_commit_transactions( &e.transactions, lock_results.to_vec(), MAX_ENTRY_IDS, @@ -1354,7 +1395,7 @@ mod tests { let lock_result = bank.lock_accounts(&pay_alice); let results_alice = - bank.execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS); + bank.load_execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS); assert_eq!(results_alice[0], Ok(())); // try executing an interleaved transfer twice @@ -1871,4 +1912,61 @@ mod tests { assert_eq!(bank.get_storage_last_id(), storage_last_id); assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]); } + + #[test] + fn test_bank_process_and_record_transactions() { + let mint = Mint::new(10_000); + let bank = Arc::new(Bank::new(&mint)); + let pubkey = Keypair::new().pubkey(); + + let transactions = vec![Transaction::system_move( + &mint.keypair(), + pubkey, + 1, + mint.last_id(), + 0, + )]; + + let (entry_sender, entry_receiver) = channel(); + let mut poh_recorder = PohRecorder::new( + bank.clone(), + entry_sender, + bank.last_id(), + Some(bank.tick_height() + 1), + ); + + bank.process_and_record_transactions(&transactions, &poh_recorder) + .unwrap(); + poh_recorder.tick().unwrap(); + + let mut need_tick = true; + // read entries until I find mine, might be ticks... + while need_tick { + let entries = entry_receiver.recv().unwrap(); + for entry in entries { + if !entry.is_tick() { + assert_eq!(entry.transactions.len(), transactions.len()); + assert_eq!(bank.get_balance(&pubkey), 1); + } else { + need_tick = false; + } + } + } + + let transactions = vec![Transaction::system_move( + &mint.keypair(), + pubkey, + 2, + mint.last_id(), + 0, + )]; + + assert_eq!( + bank.process_and_record_transactions(&transactions, &poh_recorder), + Err(BankError::RecordFailure) + ); + + assert_eq!(bank.get_balance(&pubkey), 1); + } + }