diff --git a/benches/bank.rs b/benches/bank.rs index 4af67d84a1..8c4cde215a 100644 --- a/benches/bank.rs +++ b/benches/bank.rs @@ -5,7 +5,6 @@ extern crate solana; extern crate test; use bincode::serialize; -use rayon::prelude::*; use solana::bank::*; use solana::hash::hash; use solana::mint::Mint; @@ -21,7 +20,7 @@ fn bench_process_transaction(bencher: &mut Bencher) { // Create transactions between unrelated parties. let transactions: Vec<_> = (0..4096) - .into_par_iter() + .into_iter() .map(|i| { // Seed the 'from' account. let rando0 = Keypair::new(); @@ -32,7 +31,7 @@ fn bench_process_transaction(bencher: &mut Bencher) { mint.last_id(), 0, ); - assert!(bank.process_transaction(&tx).is_ok()); + assert_eq!(bank.process_transaction(&tx), Ok(())); // Seed the 'to' account and a cell for its signature. let last_id = hash(&serialize(&i).unwrap()); // Unique hash @@ -40,7 +39,7 @@ fn bench_process_transaction(bencher: &mut Bencher) { let rando1 = Keypair::new(); let tx = Transaction::system_move(&rando0, rando1.pubkey(), 1, last_id, 0); - assert!(bank.process_transaction(&tx).is_ok()); + assert_eq!(bank.process_transaction(&tx), Ok(())); // Finally, return the transaction to the benchmark. tx diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 0991a3735d..22d6a3e2bb 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -111,8 +111,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { #[bench] fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { - //use solana::logger; - //logger::setup(); let progs = 5; let txes = 1000 * NUM_THREADS; let mint_total = 1_000_000_000_000; diff --git a/src/bank.rs b/src/bank.rs index c94ce092e1..41906a0242 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -15,14 +15,16 @@ use ledger::Block; use log::Level; use mint::Mint; use payment_plan::Payment; -use signature::{Keypair, Signature}; +use poh_recorder::PohRecorder; +use signature::Keypair; +use signature::Signature; use solana_program_interface::account::{Account, KeyedAccount}; use solana_program_interface::pubkey::Pubkey; use std; -use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::RwLock; +use std::sync::{Mutex, RwLock}; use std::time::Instant; use storage_program::StorageProgram; use system_program::SystemProgram; @@ -39,13 +41,16 @@ use window::WINDOW_SIZE; /// but requires clients to update its `last_id` more frequently. Raising the value /// lengthens the time a client must wait to be certain a missing transaction will /// not be processed by the network. -pub const MAX_ENTRY_IDS: usize = 1024 * 16; +pub const MAX_ENTRY_IDS: usize = 1024 * 32; pub const VERIFY_BLOCK_SIZE: usize = 16; /// Reasons a transaction might be rejected. #[derive(Debug, PartialEq, Eq, Clone)] pub enum BankError { + /// This Pubkey is being processed in another transaction + AccountInUse, + /// Attempt to debit from `Pubkey`, but no found no record of a prior credit. AccountNotFound, @@ -85,6 +90,9 @@ pub enum BankError { /// The program returned an error ProgramRuntimeError(u8), + + /// Recoding into PoH failed + RecordFailure, } pub type Result = result::Result; @@ -94,13 +102,16 @@ type SignatureStatusMap = HashMap>; struct ErrorCounters { account_not_found_validator: usize, account_not_found_leader: usize, + account_in_use: usize, } - /// The state of all accounts and contracts after processing its entries. pub struct Bank { /// A map of account public keys to the balance in that account. accounts: RwLock>, + /// set of accounts which are currently in the pipeline + account_locks: Mutex>, + /// A FIFO queue of `last_id` items, where each item is a set of signatures /// that have been processed using that `last_id`. Rejected `last_id` /// values are so old that the `last_id` has been pulled out of the queue. @@ -129,6 +140,7 @@ impl Default for Bank { fn default() -> Self { Bank { accounts: RwLock::new(HashMap::new()), + account_locks: Mutex::new(HashSet::new()), last_ids: RwLock::new(VecDeque::new()), last_ids_sigs: RwLock::new(HashMap::new()), transaction_count: AtomicUsize::new(0), @@ -200,18 +212,23 @@ impl Bank { } } - fn reserve_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) -> Result<()> { - if let Some(entry) = self - .last_ids_sigs - .write() - .expect("'last_ids' read lock in reserve_signature_with_last_id") - .get_mut(last_id) - { - return Self::reserve_signature(&mut entry.0, signature); + fn reserve_signature_with_last_id( + last_ids_sigs: &mut HashMap, + last_id: &Hash, + sig: &Signature, + ) -> Result<()> { + if let Some(entry) = last_ids_sigs.get_mut(last_id) { + return Self::reserve_signature(&mut entry.0, sig); } Err(BankError::LastIdNotFound) } + #[cfg(test)] + fn reserve_signature_with_last_id_test(&self, sig: &Signature, last_id: &Hash) -> Result<()> { + let mut last_ids_sigs = self.last_ids_sigs.write().unwrap(); + Self::reserve_signature_with_last_id(&mut last_ids_sigs, last_id, sig) + } + fn update_signature_status( signatures: &mut SignatureStatusMap, signature: &Signature, @@ -222,19 +239,25 @@ impl Bank { } fn update_signature_status_with_last_id( - &self, + last_ids_sigs: &mut HashMap, signature: &Signature, result: &Result<()>, last_id: &Hash, ) { - if let Some(entry) = self.last_ids_sigs.write().unwrap().get_mut(last_id) { + if let Some(entry) = last_ids_sigs.get_mut(last_id) { Self::update_signature_status(&mut entry.0, signature, result); } } fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { + let mut last_ids = self.last_ids_sigs.write().unwrap(); for (i, tx) in txs.iter().enumerate() { - self.update_signature_status_with_last_id(&tx.signature, &res[i], &tx.last_id); + Self::update_signature_status_with_last_id( + &mut last_ids, + &tx.signature, + &res[i], + &tx.last_id, + ); } } @@ -277,7 +300,8 @@ impl Bank { /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. pub fn process_transaction(&self, tx: &Transaction) -> Result<()> { - match self.process_transactions(&[tx.clone()])[0] { + let txs = vec![tx.clone()]; + match self.process_transactions(&txs)[0] { Err(ref e) => { info!("process_transaction error: {:?}", e); Err((*e).clone()) @@ -285,11 +309,38 @@ impl Bank { Ok(_) => Ok(()), } } + fn lock_account( + account_locks: &mut HashSet, + keys: &[Pubkey], + error_counters: &mut ErrorCounters, + ) -> Result<()> { + // Copy all the accounts + for k in keys { + if account_locks.contains(k) { + error_counters.account_in_use += 1; + return Err(BankError::AccountInUse); + } + } + for k in keys { + account_locks.insert(*k); + } + Ok(()) + } + + fn unlock_account(tx: &Transaction, result: &Result<()>, account_locks: &mut HashSet) { + match result { + Err(BankError::AccountInUse) => (), + _ => for k in &tx.account_keys { + account_locks.remove(k); + }, + } + } fn load_account( &self, tx: &Transaction, accounts: &HashMap, + last_ids_sigs: &mut HashMap, error_counters: &mut ErrorCounters, ) -> Result> { // Copy all the accounts @@ -310,21 +361,54 @@ impl Bank { .collect(); // There is no way to predict what contract will execute without an error // If a fee can pay for execution then the contract will be scheduled - self.reserve_signature_with_last_id(&tx.signature, &tx.last_id)?; + let err = + Self::reserve_signature_with_last_id(last_ids_sigs, &tx.last_id, &tx.signature); + err?; called_accounts[0].tokens -= tx.fee; Ok(called_accounts) } } + /// This function will prevent multiple threads from modifying the same account state at the + /// same time + #[must_use] + fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { + let mut account_locks = self.account_locks.lock().unwrap(); + let mut error_counters = ErrorCounters::default(); + let rv = txs + .iter() + .map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters)) + .collect(); + inc_new_counter_info!( + "bank-process_transactions-account_in_use", + error_counters.account_in_use + ); + rv + } + + /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline + fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { + debug!("bank unlock accounts"); + let mut account_locks = self.account_locks.lock().unwrap(); + txs.iter() + .zip(results.iter()) + .for_each(|(tx, result)| Self::unlock_account(tx, result, &mut account_locks)); + } + fn load_accounts( &self, txs: &[Transaction], - accounts: &HashMap, + results: Vec>, error_counters: &mut ErrorCounters, - ) -> Vec>> { + ) -> Vec<(Result>)> { + let accounts = self.accounts.read().unwrap(); + let mut last_sigs = self.last_ids_sigs.write().unwrap(); txs.iter() - .map(|tx| self.load_account(tx, accounts, error_counters)) - .collect() + .zip(results.into_iter()) + .map(|etx| match etx { + (tx, Ok(())) => self.load_account(tx, &accounts, &mut last_sigs, error_counters), + (_, Err(e)) => Err(e), + }).collect() } pub fn verify_transaction( @@ -477,11 +561,12 @@ impl Bank { } pub fn store_accounts( + &self, txs: &[Transaction], res: &[Result<()>], loaded: &[Result>], - accounts: &mut HashMap, ) { + let mut accounts = self.accounts.write().unwrap(); for (i, racc) in loaded.iter().enumerate() { if res[i].is_err() || racc.is_err() { continue; @@ -501,22 +586,80 @@ impl Bank { } } + pub fn process_and_record_transactions( + &self, + txs: &[Transaction], + poh: &PohRecorder, + ) -> Result<()> { + let now = Instant::now(); + // Once accounts are locked, other threads cannot encode transactions that will modify the + // same account state + let locked_accounts = self.lock_accounts(txs); + let lock_time = now.elapsed(); + let now = Instant::now(); + let results = self.execute_and_commit_transactions(txs, locked_accounts); + let process_time = now.elapsed(); + let now = Instant::now(); + self.record_transactions(txs, &results, poh)?; + let record_time = now.elapsed(); + let now = Instant::now(); + // Once the accounts are unlocked new transactions can enter the pipeline to process them + self.unlock_accounts(&txs, &results); + let unlock_time = now.elapsed(); + debug!( + "lock: {}us process: {}us record: {}us unlock: {}us txs_len={}", + duration_as_us(&lock_time), + duration_as_us(&process_time), + duration_as_us(&record_time), + duration_as_us(&unlock_time), + txs.len(), + ); + Ok(()) + } + + fn record_transactions( + &self, + txs: &[Transaction], + results: &[Result<()>], + poh: &PohRecorder, + ) -> Result<()> { + let processed_transactions: Vec<_> = results + .iter() + .zip(txs.iter()) + .filter_map(|(r, x)| match r { + Ok(_) => Some(x.clone()), + Err(ref e) => { + debug!("process transaction failed {:?}", e); + None + } + }).collect(); + // unlock all the accounts with errors which are filtered by the above `filter_map` + if !processed_transactions.is_empty() { + let hash = Transaction::hash(&processed_transactions); + debug!("processed ok: {} {}", processed_transactions.len(), hash); + // record and unlock will unlock all the successfull transactions + poh.record(hash, processed_transactions).map_err(|e| { + warn!("record failure: {:?}", e); + BankError::RecordFailure + })?; + } + Ok(()) + } + /// Process a batch of transactions. #[must_use] - pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { + pub fn execute_and_commit_transactions( + &self, + txs: &[Transaction], + locked_accounts: Vec>, + ) -> Vec> { debug!("processing transactions: {}", txs.len()); - // TODO right now a single write lock is held for the duration of processing all the - // transactions - // To break this lock each account needs to be locked to prevent concurrent access - let mut accounts = self.accounts.write().unwrap(); - let txs_len = txs.len(); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); - let mut loaded_accounts = self.load_accounts(&txs, &accounts, &mut error_counters); + let mut loaded_accounts = self.load_accounts(txs, locked_accounts, &mut error_counters); let load_elapsed = now.elapsed(); let now = Instant::now(); - - let res: Vec<_> = loaded_accounts + let executed: Vec> = loaded_accounts .iter_mut() .zip(txs.iter()) .map(|(acc, tx)| match acc { @@ -525,19 +668,20 @@ impl Bank { }).collect(); let execution_elapsed = now.elapsed(); let now = Instant::now(); - Self::store_accounts(&txs, &res, &loaded_accounts, &mut accounts); - self.update_transaction_statuses(&txs, &res); + self.store_accounts(txs, &executed, &loaded_accounts); + // once committed there is no way to unroll let write_elapsed = now.elapsed(); debug!( - "load: {}us execution: {}us write: {}us txs_len={}", + "load: {}us execute: {}us store: {}us txs_len={}", duration_as_us(&load_elapsed), duration_as_us(&execution_elapsed), duration_as_us(&write_elapsed), - txs_len + txs.len(), ); + self.update_transaction_statuses(txs, &executed); let mut tx_count = 0; let mut err_count = 0; - for r in &res { + for r in &executed { if r.is_ok() { tx_count += 1; } else { @@ -562,14 +706,21 @@ impl Bank { error_counters.account_not_found_leader ); } + inc_new_counter_info!("bank-process_transactions-error_count", err_count); } - let cur_tx_count = self.transaction_count.load(Ordering::Relaxed); - if ((cur_tx_count + tx_count) & !(262_144 - 1)) > cur_tx_count & !(262_144 - 1) { - info!("accounts.len: {}", accounts.len()); - } + self.transaction_count .fetch_add(tx_count, Ordering::Relaxed); - res + inc_new_counter_info!("bank-process_transactions-txs", tx_count); + executed + } + + #[must_use] + pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { + let locked_accounts = self.lock_accounts(txs); + let results = self.execute_and_commit_transactions(txs, locked_accounts); + self.unlock_accounts(txs, &results); + results } pub fn process_entry(&self, entry: &Entry) -> Result<()> { @@ -789,9 +940,11 @@ mod tests { use hash::hash; use ledger; use logger; + use signature::Keypair; use signature::{GenKeys, KeypairUtil}; use std; use std::io::{BufReader, Cursor, Seek, SeekFrom}; + use system_transaction::SystemTransaction; use transaction::Instruction; #[test] @@ -818,13 +971,37 @@ mod tests { assert_eq!(bank.transaction_count(), 2); } + #[test] + fn test_one_source_two_tx_one_batch() { + let mint = Mint::new(1); + let key1 = Keypair::new().pubkey(); + let key2 = Keypair::new().pubkey(); + let bank = Bank::new(&mint); + assert_eq!(bank.last_id(), mint.last_id()); + + let t1 = Transaction::system_move(&mint.keypair(), key1, 1, mint.last_id(), 0); + let t2 = Transaction::system_move(&mint.keypair(), key2, 1, mint.last_id(), 0); + let res = bank.process_transactions(&vec![t1.clone(), t2.clone()]); + assert_eq!(res.len(), 2); + assert_eq!(res[0], Ok(())); + assert_eq!(res[1], Err(BankError::AccountInUse)); + assert_eq!(bank.get_balance(&mint.pubkey()), 0); + assert_eq!(bank.get_balance(&key1), 1); + assert_eq!(bank.get_balance(&key2), 0); + assert_eq!(bank.get_signature(&t1.last_id, &t1.signature), Some(Ok(()))); + // TODO: Transactions that fail to pay a fee could be dropped silently + assert_eq!( + bank.get_signature(&t2.last_id, &t2.signature), + Some(Err(BankError::AccountInUse)) + ); + } + #[test] fn test_one_tx_two_out_atomic_fail() { let mint = Mint::new(1); let key1 = Keypair::new().pubkey(); let key2 = Keypair::new().pubkey(); let bank = Bank::new(&mint); - let spend = SystemProgram::Move { tokens: 1 }; let instructions = vec![ Instruction { @@ -889,7 +1066,7 @@ mod tests { ); let res = bank.process_transactions(&vec![t1.clone()]); assert_eq!(res.len(), 1); - assert!(res[0].is_ok()); + assert_eq!(res[0], Ok(())); assert_eq!(bank.get_balance(&mint.pubkey()), 0); assert_eq!(bank.get_balance(&key1), 1); assert_eq!(bank.get_balance(&key2), 1); @@ -931,7 +1108,7 @@ mod tests { let res = bank.process_transaction(&tx); // Result failed, but signature is registered - assert!(!res.is_ok()); + assert!(res.is_err()); assert!(bank.has_signature(&signature)); assert_matches!( bank.get_signature_status(&signature), @@ -992,12 +1169,12 @@ mod tests { let mint = Mint::new(1); let bank = Bank::new(&mint); let signature = Signature::default(); - assert!( - bank.reserve_signature_with_last_id(&signature, &mint.last_id()) - .is_ok() + assert_eq!( + bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), + Ok(()) ); assert_eq!( - bank.reserve_signature_with_last_id(&signature, &mint.last_id()), + bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), Err(BankError::DuplicateSignature) ); } @@ -1007,12 +1184,12 @@ mod tests { let mint = Mint::new(1); let bank = Bank::new(&mint); let signature = Signature::default(); - bank.reserve_signature_with_last_id(&signature, &mint.last_id()) + bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()) .unwrap(); bank.clear_signatures(); - assert!( - bank.reserve_signature_with_last_id(&signature, &mint.last_id()) - .is_ok() + assert_eq!( + bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), + Ok(()) ); } @@ -1021,9 +1198,9 @@ mod tests { let mint = Mint::new(1); let bank = Bank::new(&mint); let signature = Signature::default(); - bank.reserve_signature_with_last_id(&signature, &mint.last_id()) + bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()) .expect("reserve signature"); - assert!(bank.get_signature_status(&signature).is_ok()); + assert_eq!(bank.get_signature_status(&signature), Ok(())); } #[test] @@ -1031,7 +1208,7 @@ mod tests { let mint = Mint::new(1); let bank = Bank::new(&mint); let signature = Signature::default(); - bank.reserve_signature_with_last_id(&signature, &mint.last_id()) + bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()) .expect("reserve signature"); assert!(bank.has_signature(&signature)); } @@ -1047,7 +1224,7 @@ mod tests { } // Assert we're no longer able to use the oldest entry ID. assert_eq!( - bank.reserve_signature_with_last_id(&signature, &mint.last_id()), + bank.reserve_signature_with_last_id_test(&signature, &mint.last_id()), Err(BankError::LastIdNotFound) ); } @@ -1100,7 +1277,7 @@ mod tests { // Now ensure the TX is accepted despite pointing to the ID of an empty entry. bank.process_entries(&[entry]).unwrap(); - assert!(bank.process_transaction(&tx).is_ok()); + assert_eq!(bank.process_transaction(&tx), Ok(())); } #[test] @@ -1116,12 +1293,19 @@ mod tests { mint: &Mint, keypairs: &[Keypair], ) -> impl Iterator { - let hash = mint.last_id(); - let transactions: Vec<_> = keypairs - .iter() - .map(|keypair| Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, hash)) - .collect(); - let entries = ledger::next_entries(&hash, 0, transactions); + let mut hash = mint.last_id(); + let mut entries: Vec = vec![]; + for k in keypairs { + let txs = vec![Transaction::system_new( + &mint.keypair(), + k.pubkey(), + 1, + hash, + )]; + let mut e = ledger::next_entries(&hash, 0, txs); + entries.append(&mut e); + hash = entries.last().unwrap().id; + } entries.into_iter() } @@ -1265,4 +1449,37 @@ mod tests { def_bank.set_finality(90); assert_eq!(def_bank.finality(), 90); } + #[test] + fn test_interleaving_locks() { + let mint = Mint::new(3); + let bank = Bank::new(&mint); + let alice = Keypair::new(); + let bob = Keypair::new(); + + let tx1 = Transaction::system_new(&mint.keypair(), alice.pubkey(), 1, mint.last_id()); + let pay_alice = vec![tx1]; + + let locked_alice = bank.lock_accounts(&pay_alice); + let results_alice = bank.execute_and_commit_transactions(&pay_alice, locked_alice); + assert_eq!(results_alice[0], Ok(())); + + // try executing an interleaved transfer twice + assert_eq!( + bank.transfer(1, &mint.keypair(), bob.pubkey(), mint.last_id()), + Err(BankError::AccountInUse) + ); + // the second time shoudl fail as well + // this verifies that `unlock_accounts` doesn't unlock `AccountInUse` accounts + assert_eq!( + bank.transfer(1, &mint.keypair(), bob.pubkey(), mint.last_id()), + Err(BankError::AccountInUse) + ); + + bank.unlock_accounts(&pay_alice, &results_alice); + + assert_matches!( + bank.transfer(2, &mint.keypair(), bob.pubkey(), mint.last_id()), + Ok(_) + ); + } } diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 59403fa8ae..336a221911 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -26,7 +26,7 @@ use timing; use transaction::Transaction; // number of threads is 1 until mt bank is ready -pub const NUM_THREADS: usize = 1; +pub const NUM_THREADS: usize = 10; /// Stores the stage's thread handle and output receiver. pub struct BankingStage { @@ -165,24 +165,8 @@ impl BankingStage { while chunk_start != transactions.len() { let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]); - let results = bank.process_transactions(&transactions[chunk_start..chunk_end]); + bank.process_and_record_transactions(&transactions[chunk_start..chunk_end], poh)?; - let processed_transactions: Vec<_> = transactions[chunk_start..chunk_end] - .into_iter() - .enumerate() - .filter_map(|(i, x)| match results[i] { - Ok(_) => Some(x.clone()), - Err(ref e) => { - debug!("process transaction failed {:?}", e); - None - } - }).collect(); - - if !processed_transactions.is_empty() { - let hash = Transaction::hash(&processed_transactions); - debug!("processed ok: {} {}", processed_transactions.len(), hash); - poh.record(hash, processed_transactions)?; - } chunk_start = chunk_end; } debug!("done process_transactions"); @@ -403,11 +387,9 @@ mod tests { // the account balance below zero before the credit is added. let bank = Bank::new(&mint); for entry in entries { - assert!( - bank.process_transactions(&entry.transactions) - .into_iter() - .all(|x| x.is_ok()) - ); + bank.process_transactions(&entry.transactions) + .iter() + .for_each(|x| assert_eq!(*x, Ok(()))); } assert_eq!(bank.get_balance(&alice.pubkey()), 1); } diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 8343224d23..2285de6930 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -285,7 +285,7 @@ pub mod tests { // vote should be valid let blob = &vote_blob.unwrap()[0]; let tx = deserialize(&(blob.read().unwrap().data)).unwrap(); - assert!(bank.process_transaction(&tx).is_ok()); + assert_eq!(bank.process_transaction(&tx), Ok(())); } #[test]