diff --git a/src/bank.rs b/src/bank.rs index 14276ee5bf..28d879420d 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -7,8 +7,6 @@ use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionL use crate::counter::Counter; use crate::genesis_block::GenesisBlock; use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; -use crate::poh_recorder::{PohRecorder, PohRecorderError}; -use crate::result::Error; use crate::rpc_pubsub::RpcSubscriptions; use crate::status_cache::StatusCache; use bincode::deserialize; @@ -322,93 +320,6 @@ impl Bank { self.accounts.unlock_accounts(txs, results) } - pub fn process_and_record_transactions( - &self, - txs: &[Transaction], - poh: &PohRecorder, - ) -> Result<()> { - let now = Instant::now(); - // Once accounts are locked, other threads cannot encode transactions that will modify the - // same account state - let lock_results = self.lock_accounts(txs); - let lock_time = now.elapsed(); - - let now = Instant::now(); - // Use a shorter maximum age when adding transactions into the pipeline. This will reduce - // the likelihood of any single thread getting starved and processing old ids. - // TODO: Banking stage threads should be prioritized to complete faster then this queue - // expires. - let (loaded_accounts, results) = - self.load_and_execute_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2); - let load_execute_time = now.elapsed(); - - let record_time = { - let now = Instant::now(); - self.record_transactions(txs, &results, poh)?; - now.elapsed() - }; - - let commit_time = { - let now = Instant::now(); - self.commit_transactions(txs, &loaded_accounts, &results); - now.elapsed() - }; - - let now = Instant::now(); - // Once the accounts are new transactions can enter the pipeline to process them - self.unlock_accounts(&txs, &results); - let unlock_time = now.elapsed(); - debug!( - "lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}", - duration_as_us(&lock_time), - duration_as_us(&load_execute_time), - duration_as_us(&record_time), - duration_as_us(&commit_time), - duration_as_us(&unlock_time), - txs.len(), - ); - Ok(()) - } - - fn record_transactions( - &self, - txs: &[Transaction], - results: &[Result<()>], - poh: &PohRecorder, - ) -> Result<()> { - let processed_transactions: Vec<_> = results - .iter() - .zip(txs.iter()) - .filter_map(|(r, x)| match r { - Ok(_) => Some(x.clone()), - Err(BankError::ProgramError(index, err)) => { - info!("program error {:?}, {:?}", index, err); - Some(x.clone()) - } - Err(ref e) => { - debug!("process transaction failed {:?}", e); - None - } - }) - .collect(); - debug!("processed: {} ", processed_transactions.len()); - // unlock all the accounts with errors which are filtered by the above `filter_map` - if !processed_transactions.is_empty() { - let hash = Transaction::hash(&processed_transactions); - // record and unlock will unlock all the successfull transactions - poh.record(hash, processed_transactions).map_err(|e| { - warn!("record failure: {:?}", e); - match e { - Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { - BankError::MaxHeightReached - } - _ => BankError::RecordFailure, - } - })?; - } - Ok(()) - } - fn load_accounts( &self, txs: &[Transaction], @@ -457,7 +368,7 @@ impl Bank { .collect() } #[allow(clippy::type_complexity)] - fn load_and_execute_transactions( + pub fn load_and_execute_transactions( &self, txs: &[Transaction], lock_results: Vec>, @@ -556,7 +467,7 @@ impl Bank { (loaded_accounts, executed) } - fn commit_transactions( + pub fn commit_transactions( &self, txs: &[Transaction], loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>], @@ -727,7 +638,6 @@ mod tests { use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Instruction; use std; - use std::sync::mpsc::channel; #[test] fn test_bank_new() { @@ -1090,44 +1000,6 @@ mod tests { assert!(ids.into_iter().all(move |id| unique.insert(id))); } - #[test] - fn test_bank_record_transactions() { - let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); - let bank = Arc::new(Bank::new(&genesis_block)); - let (entry_sender, entry_receiver) = channel(); - let poh_recorder = - PohRecorder::new(bank.clone(), entry_sender, bank.last_id(), std::u64::MAX); - let pubkey = Keypair::new().pubkey(); - - let transactions = vec![ - SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0), - SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0), - ]; - - let mut results = vec![Ok(()), Ok(())]; - bank.record_transactions(&transactions, &results, &poh_recorder) - .unwrap(); - let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].transactions.len(), transactions.len()); - - // ProgramErrors should still be recorded - results[0] = Err(BankError::ProgramError( - 1, - ProgramError::ResultWithNegativeTokens, - )); - bank.record_transactions(&transactions, &results, &poh_recorder) - .unwrap(); - let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].transactions.len(), transactions.len()); - - // Other BankErrors should not be recorded - results[0] = Err(BankError::AccountNotFound); - bank.record_transactions(&transactions, &results, &poh_recorder) - .unwrap(); - let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].transactions.len(), transactions.len() - 1); - } - #[test] fn test_bank_storage() { solana_logger::setup(); @@ -1175,61 +1047,6 @@ mod tests { assert_eq!(bank.get_storage_last_id(), storage_last_id); } - #[test] - fn test_bank_process_and_record_transactions() { - let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); - let bank = Arc::new(Bank::new(&genesis_block)); - let pubkey = Keypair::new().pubkey(); - - let transactions = vec![SystemTransaction::new_move( - &mint_keypair, - pubkey, - 1, - genesis_block.last_id(), - 0, - )]; - - let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new( - bank.clone(), - entry_sender, - bank.last_id(), - bank.tick_height() + 1, - ); - - bank.process_and_record_transactions(&transactions, &poh_recorder) - .unwrap(); - poh_recorder.tick().unwrap(); - - let mut need_tick = true; - // read entries until I find mine, might be ticks... - while need_tick { - let entries = entry_receiver.recv().unwrap(); - for entry in entries { - if !entry.is_tick() { - assert_eq!(entry.transactions.len(), transactions.len()); - assert_eq!(bank.get_balance(&pubkey), 1); - } else { - need_tick = false; - } - } - } - - let transactions = vec![SystemTransaction::new_move( - &mint_keypair, - pubkey, - 2, - genesis_block.last_id(), - 0, - )]; - - assert_eq!( - bank.process_and_record_transactions(&transactions, &poh_recorder), - Err(BankError::MaxHeightReached) - ); - - assert_eq!(bank.get_balance(&pubkey), 1); - } #[test] fn test_bank_pay_to_self() { let (genesis_block, mint_keypair) = GenesisBlock::new(1); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 3458c6a2be..dbb393ed33 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -2,13 +2,14 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::bank::{Bank, BankError}; +use crate::bank::{self, Bank, BankError}; use crate::compute_leader_confirmation_service::ComputeLeaderConfirmationService; use crate::counter::Counter; use crate::entry::Entry; +use crate::last_id_queue::MAX_ENTRY_IDS; use crate::packet::Packets; use crate::packet::SharedPackets; -use crate::poh_recorder::PohRecorder; +use crate::poh_recorder::{PohRecorder, PohRecorderError}; use crate::poh_service::{PohService, PohServiceConfig}; use crate::result::{Error, Result}; use crate::service::Service; @@ -19,6 +20,7 @@ use log::Level; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing; +use solana_sdk::timing::duration_as_us; use solana_sdk::transaction::Transaction; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; @@ -122,6 +124,92 @@ impl BankingStage { .collect() } + fn record_transactions( + txs: &[Transaction], + results: &[bank::Result<()>], + poh: &PohRecorder, + ) -> bank::Result<()> { + let processed_transactions: Vec<_> = results + .iter() + .zip(txs.iter()) + .filter_map(|(r, x)| match r { + Ok(_) => Some(x.clone()), + Err(BankError::ProgramError(index, err)) => { + info!("program error {:?}, {:?}", index, err); + Some(x.clone()) + } + Err(ref e) => { + debug!("process transaction failed {:?}", e); + None + } + }) + .collect(); + debug!("processed: {} ", processed_transactions.len()); + // unlock all the accounts with errors which are filtered by the above `filter_map` + if !processed_transactions.is_empty() { + let hash = Transaction::hash(&processed_transactions); + // record and unlock will unlock all the successfull transactions + poh.record(hash, processed_transactions).map_err(|e| { + warn!("record failure: {:?}", e); + match e { + Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { + BankError::MaxHeightReached + } + _ => BankError::RecordFailure, + } + })?; + } + Ok(()) + } + + pub fn process_and_record_transactions( + bank: &Bank, + txs: &[Transaction], + poh: &PohRecorder, + ) -> bank::Result<()> { + let now = Instant::now(); + // Once accounts are locked, other threads cannot encode transactions that will modify the + // same account state + let lock_results = bank.lock_accounts(txs); + let lock_time = now.elapsed(); + + let now = Instant::now(); + // Use a shorter maximum age when adding transactions into the pipeline. This will reduce + // the likelihood of any single thread getting starved and processing old ids. + // TODO: Banking stage threads should be prioritized to complete faster then this queue + // expires. + let (loaded_accounts, results) = + bank.load_and_execute_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2); + let load_execute_time = now.elapsed(); + + let record_time = { + let now = Instant::now(); + Self::record_transactions(txs, &results, poh)?; + now.elapsed() + }; + + let commit_time = { + let now = Instant::now(); + bank.commit_transactions(txs, &loaded_accounts, &results); + now.elapsed() + }; + + let now = Instant::now(); + // Once the accounts are new transactions can enter the pipeline to process them + bank.unlock_accounts(&txs, &results); + let unlock_time = now.elapsed(); + debug!( + "lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}", + duration_as_us(&lock_time), + duration_as_us(&load_execute_time), + duration_as_us(&record_time), + duration_as_us(&commit_time), + duration_as_us(&unlock_time), + txs.len(), + ); + Ok(()) + } + /// Sends transactions to the bank. /// /// Returns the number of transactions successfully processed by the bank, which may be less @@ -135,8 +223,11 @@ impl BankingStage { while chunk_start != transactions.len() { let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]); - let result = - bank.process_and_record_transactions(&transactions[chunk_start..chunk_end], poh); + let result = Self::process_and_record_transactions( + bank, + &transactions[chunk_start..chunk_end], + poh, + ); if Err(BankError::MaxHeightReached) == result { break; } @@ -264,6 +355,7 @@ mod tests { use crate::genesis_block::GenesisBlock; use crate::leader_scheduler::DEFAULT_TICKS_PER_SLOT; use crate::packet::to_packets; + use solana_sdk::native_program::ProgramError; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use std::thread::sleep; @@ -497,4 +589,94 @@ mod tests { assert_eq!(packets.read().unwrap().packets.len(), 1); // TODO: maybe compare actual packet contents too assert_eq!(*start_index, 0); } + + #[test] + fn test_bank_record_transactions() { + let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); + let bank = Arc::new(Bank::new(&genesis_block)); + let (entry_sender, entry_receiver) = channel(); + let poh_recorder = + PohRecorder::new(bank.clone(), entry_sender, bank.last_id(), std::u64::MAX); + let pubkey = Keypair::new().pubkey(); + + let transactions = vec![ + SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0), + SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0), + ]; + + let mut results = vec![Ok(()), Ok(())]; + BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + let entries = entry_receiver.recv().unwrap(); + assert_eq!(entries[0].transactions.len(), transactions.len()); + + // ProgramErrors should still be recorded + results[0] = Err(BankError::ProgramError( + 1, + ProgramError::ResultWithNegativeTokens, + )); + BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + let entries = entry_receiver.recv().unwrap(); + assert_eq!(entries[0].transactions.len(), transactions.len()); + + // Other BankErrors should not be recorded + results[0] = Err(BankError::AccountNotFound); + BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + let entries = entry_receiver.recv().unwrap(); + assert_eq!(entries[0].transactions.len(), transactions.len() - 1); + } + + #[test] + fn test_bank_process_and_record_transactions() { + let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); + let bank = Arc::new(Bank::new(&genesis_block)); + let pubkey = Keypair::new().pubkey(); + + let transactions = vec![SystemTransaction::new_move( + &mint_keypair, + pubkey, + 1, + genesis_block.last_id(), + 0, + )]; + + let (entry_sender, entry_receiver) = channel(); + let mut poh_recorder = PohRecorder::new( + bank.clone(), + entry_sender, + bank.last_id(), + bank.tick_height() + 1, + ); + + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); + poh_recorder.tick().unwrap(); + + let mut need_tick = true; + // read entries until I find mine, might be ticks... + while need_tick { + let entries = entry_receiver.recv().unwrap(); + for entry in entries { + if !entry.is_tick() { + assert_eq!(entry.transactions.len(), transactions.len()); + assert_eq!(bank.get_balance(&pubkey), 1); + } else { + need_tick = false; + } + } + } + + let transactions = vec![SystemTransaction::new_move( + &mint_keypair, + pubkey, + 2, + genesis_block.last_id(), + 0, + )]; + + assert_eq!( + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), + Err(BankError::MaxHeightReached) + ); + + assert_eq!(bank.get_balance(&pubkey), 1); + } }