From 97c97db97e8b5c81345336279a87511b5071308f Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 26 Jun 2019 22:39:50 -0700 Subject: [PATCH] Fix early exit clearing all buffered packets (#4810) --- core/src/banking_stage.rs | 208 +++++++++++++++++++++++++++++--------- runtime/src/bank.rs | 17 +++- 2 files changed, 175 insertions(+), 50 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ccc3841527..11f4461f4a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -28,7 +28,7 @@ use solana_sdk::timing::{ }; use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; @@ -85,7 +85,6 @@ impl BankingStage { // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. - let exit = Arc::new(AtomicBool::new(false)); let my_pubkey = cluster_info.read().unwrap().id(); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) @@ -99,7 +98,6 @@ impl BankingStage { let poh_recorder = poh_recorder.clone(); let cluster_info = cluster_info.clone(); - let exit = exit.clone(); let mut recv_start = Instant::now(); Builder::new() .name("solana-banking-stage-tx".to_string()) @@ -113,7 +111,6 @@ impl BankingStage { enable_forwarding, i, ); - exit.store(true, Ordering::Relaxed); }) .unwrap() }) @@ -171,7 +168,7 @@ impl BankingStage { &poh_recorder, &msgs, unprocessed_indexes.to_owned(), - )?; + ); new_tx_count += processed; @@ -358,8 +355,7 @@ impl BankingStage { buffered_packets.append(&mut unprocessed_packets); } Err(err) => { - debug!("solana-banking-stage-tx: exit due to {:?}", err); - break; + debug!("solana-banking-stage-tx error: {:?}", err); } } } @@ -377,16 +373,22 @@ impl BankingStage { .collect() } + #[allow(clippy::match_wild_err_arm)] fn record_transactions( bank_slot: u64, txs: &[Transaction], results: &[transaction::Result<()>], poh: &Arc>, - ) -> Result<()> { + ) -> (Result<()>, Vec) { + let mut ok_txs = vec![]; let processed_transactions: Vec<_> = results .iter() .zip(txs.iter()) - .filter_map(|(r, x)| { + .enumerate() + .filter_map(|(i, (r, x))| { + if r.is_ok() { + ok_txs.push(i); + } if Bank::can_commit(r) { Some(x.clone()) } else { @@ -394,6 +396,7 @@ impl BankingStage { } }) .collect(); + debug!("processed: {} ", processed_transactions.len()); // unlock all the accounts with errors which are filtered by the above `filter_map` if !processed_transactions.is_empty() { @@ -401,13 +404,24 @@ impl BankingStage { "banking_stage-record_transactions", processed_transactions.len() ); - let hash = hash_transactions(&processed_transactions); + let hash = hash_transactions(&processed_transactions[..]); // record and unlock will unlock all the successful transactions - poh.lock() + let res = poh + .lock() .unwrap() - .record(bank_slot, hash, processed_transactions)?; + .record(bank_slot, hash, processed_transactions); + + match res { + Ok(()) => (), + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { + // If record errors, return all the ok transactions as retryable, filter out + // all the transactions we know were errors + return (res, ok_txs); + } + Err(_) => panic!("Poh recorder returned unexpected error"), + } } - Ok(()) + (Ok(()), vec![]) } fn process_and_record_transactions_locked( @@ -415,13 +429,13 @@ impl BankingStage { txs: &[Transaction], poh: &Arc>, lock_results: &LockedAccountsResults, - ) -> Result<()> { + ) -> (Result<()>, Vec) { let now = Instant::now(); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. - let (loaded_accounts, results) = + let (loaded_accounts, results, mut retryable_txs) = bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE); let load_execute_time = now.elapsed(); @@ -429,7 +443,12 @@ impl BankingStage { let record_time = { let now = Instant::now(); - Self::record_transactions(bank.slot(), txs, &results, poh)?; + let (res, retryable_record_txs) = + Self::record_transactions(bank.slot(), txs, &results, poh); + retryable_txs.extend(retryable_record_txs); + if res.is_err() { + return (res, retryable_txs); + } now.elapsed() }; @@ -450,7 +469,7 @@ impl BankingStage { txs.len(), ); - Ok(()) + (Ok(()), retryable_txs) } pub fn process_and_record_transactions( @@ -465,18 +484,9 @@ impl BankingStage { let lock_results = bank.lock_accounts(txs); let lock_time = now.elapsed(); - let unprocessed_txs: Vec<_> = lock_results - .locked_accounts_results() - .iter() - .zip(chunk_offset..) - .filter_map(|(res, index)| match res { - Err(TransactionError::AccountInUse) => Some(index), - Ok(_) => None, - Err(_) => None, - }) - .collect(); - - let results = Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results); + let (result, mut retryable_txs) = + Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results); + retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); let now = Instant::now(); // Once the accounts are new transactions can enter the pipeline to process them @@ -491,7 +501,7 @@ impl BankingStage { txs.len(), ); - (results, unprocessed_txs) + (result, retryable_txs) } /// Sends transactions to the bank. @@ -502,7 +512,7 @@ impl BankingStage { bank: &Bank, transactions: &[Transaction], poh: &Arc>, - ) -> Result<(usize, Vec)> { + ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; while chunk_start != transactions.len() { @@ -513,30 +523,38 @@ impl BankingStage { &Entry::serialized_to_blob_size, ); - let (result, unprocessed_txs_in_chunk) = Self::process_and_record_transactions( + let (result, retryable_txs_in_chunk) = Self::process_and_record_transactions( bank, &transactions[chunk_start..chunk_end], poh, chunk_start, ); - trace!("process_transactions: {:?}", result); - unprocessed_txs.extend_from_slice(&unprocessed_txs_in_chunk); + trace!("process_transactions result: {:?}", result); + + // Add the retryable txs (transactions that errored in a way that warrants a retry) + // to the list of unprocessed txs. + unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk); if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { info!( "process transactions: max height reached slot: {} height: {}", bank.slot(), bank.tick_height() ); - let range: Vec = (chunk_start..chunk_end).collect(); + // process_and_record_transactions has returned all retryable errors in + // transactions[chunk_start..chunk_end], so we just need to push the remaining + // transactions into the unprocessed queue. + let range: Vec = (chunk_end..transactions.len()).collect(); unprocessed_txs.extend_from_slice(&range); unprocessed_txs.sort_unstable(); unprocessed_txs.dedup(); break; } - result?; + // Don't exit early on any other type of error, continue processing... + chunk_start = chunk_end; } - Ok((chunk_start, unprocessed_txs)) + + (chunk_start, unprocessed_txs) } // This function returns a vector of transactions that are not None. It also returns a vector @@ -601,7 +619,7 @@ impl BankingStage { Self::filter_transaction_indexes(transactions, &transaction_indexes) } - // This function filters pending transactions that are still valid + // This function filters pending transactions that are still valid fn filter_pending_transactions( bank: &Arc, transactions: &[Transaction], @@ -636,7 +654,7 @@ impl BankingStage { poh: &Arc>, msgs: &Packets, transaction_indexes: Vec, - ) -> Result<(usize, usize, Vec)> { + ) -> (usize, usize, Vec) { let (transactions, transaction_indexes) = Self::transactions_from_packets(msgs, &transaction_indexes); debug!( @@ -648,7 +666,7 @@ impl BankingStage { let tx_len = transactions.len(); let (processed, unprocessed_tx_indexes) = - Self::process_transactions(bank, &transactions, poh)?; + Self::process_transactions(bank, &transactions, poh); let unprocessed_tx_count = unprocessed_tx_indexes.len(); @@ -663,7 +681,7 @@ impl BankingStage { unprocessed_tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len()) ); - Ok((processed, tx_len, filtered_unprocessed_tx_indexes)) + (processed, tx_len, filtered_unprocessed_tx_indexes) } fn filter_unprocessed_packets( @@ -746,7 +764,7 @@ impl BankingStage { let bank = bank.unwrap(); let (processed, verified_txs_len, unprocessed_indexes) = - Self::process_received_packets(&bank, &poh, &msgs, packet_indexes)?; + Self::process_received_packets(&bank, &poh, &msgs, packet_indexes); new_tx_count += processed; @@ -1191,8 +1209,7 @@ mod tests { ]; let mut results = vec![Ok(()), Ok(())]; - BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) - .unwrap(); + BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder); let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len()); @@ -1201,17 +1218,48 @@ mod tests { 1, InstructionError::new_result_with_negative_lamports(), )); - BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) - .unwrap(); + let (res, retryable) = BankingStage::record_transactions( + bank.slot(), + &transactions, + &results, + &poh_recorder, + ); + res.unwrap(); + assert!(retryable.is_empty()); let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len()); // Other TransactionErrors should not be recorded results[0] = Err(TransactionError::AccountNotFound); - BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) - .unwrap(); + let (res, retryable) = BankingStage::record_transactions( + bank.slot(), + &transactions, + &results, + &poh_recorder, + ); + res.unwrap(); + assert!(retryable.is_empty()); let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); + + // Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions), + // record_transactions should throw MaxHeightReached and return the set of retryable + // txs + let (res, retryable) = BankingStage::record_transactions( + bank.slot() + 1, + &transactions, + &results, + &poh_recorder, + ); + assert_matches!( + res, + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + ); + // The first result was an error so it's filtered out. The second result was Ok(), + // so it should be marked as retryable + assert_eq!(retryable, vec![1]); + // Should receive nothing from PohRecorder b/c record failed + assert!(entry_receiver.try_recv().is_err()); } Blocktree::destroy(&ledger_path).unwrap(); } @@ -1617,4 +1665,68 @@ mod tests { }) .collect_vec(); } + + #[test] + fn test_process_transactions_returns_unprocessed_txs() { + solana_logger::setup(); + let GenesisBlockInfo { + genesis_block, + mint_keypair, + .. + } = create_genesis_block(10_000); + let bank = Arc::new(Bank::new(&genesis_block)); + let mut transactions = vec![]; + + loop { + let pubkey = Pubkey::new_rand(); + // Make enough transactions to span multiple entries + transactions.push(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + )); + + if entry::num_will_fit( + &transactions[0..], + packet::BLOB_DATA_SIZE as u64, + &Entry::serialized_to_blob_size, + ) < transactions.len() + { + break; + } + } + + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (poh_recorder, _entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some(4), + bank.ticks_per_slot(), + &Pubkey::new_rand(), + &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + ); + + // Poh Recorder has not working bank, so should throw MaxHeightReached error on + // record + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + + let (processed_transactions_count, mut retryable_txs) = + BankingStage::process_transactions(&bank, &transactions, &poh_recorder); + + assert_eq!(processed_transactions_count, 0,); + + retryable_txs.sort(); + let expected: Vec = (0..transactions.len()).collect(); + assert_eq!(retryable_txs, expected); + } + + Blocktree::destroy(&ledger_path).unwrap(); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 4065f6749b..828cd947e9 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -939,10 +939,23 @@ impl Bank { ) -> ( Vec>, Vec>, + Vec, ) { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); + + let retryable_txs: Vec<_> = lock_results + .locked_accounts_results() + .iter() + .enumerate() + .filter_map(|(index, res)| match res { + Err(TransactionError::AccountInUse) => Some(index), + Ok(_) => None, + Err(_) => None, + }) + .collect(); + let sig_results = self.check_transactions( txs, lock_results.locked_accounts_results(), @@ -1004,7 +1017,7 @@ impl Bank { inc_new_counter_info!("bank-process_transactions-txs", tx_count, 0, 1000); inc_new_counter_info!("bank-process_transactions-sigs", signature_count, 0, 1000); Self::update_error_counters(&error_counters); - (loaded_accounts, executed) + (loaded_accounts, executed, retryable_txs) } fn filter_program_errors_and_collect_fee( @@ -1089,7 +1102,7 @@ impl Bank { lock_results: &LockedAccountsResults, max_age: usize, ) -> Vec> { - let (loaded_accounts, executed) = + let (loaded_accounts, executed, _) = self.load_and_execute_transactions(txs, lock_results, max_age); self.commit_transactions(txs, &loaded_accounts, &executed)