diff --git a/benches/bank.rs b/benches/bank.rs index 4f31601025..cc0264fe5b 100644 --- a/benches/bank.rs +++ b/benches/bank.rs @@ -5,6 +5,7 @@ extern crate solana; extern crate test; use solana::bank::*; +use solana::hash::hash; use solana::mint::Mint; use solana::signature::{Keypair, KeypairUtil}; use solana::system_transaction::SystemTransaction; @@ -40,6 +41,13 @@ fn bench_process_transaction(bencher: &mut Bencher) { tx }).collect(); + let mut id = bank.last_id(); + + for _ in 0..(MAX_ENTRY_IDS - 1) { + bank.register_entry_id(&id); + id = hash(&id.as_ref()) + } + bencher.iter(|| { // Since benchmarker runs this multiple times, we need to clear the signatures. bank.clear_signatures(); diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 3c354c9d83..eaa78d220b 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -8,9 +8,10 @@ extern crate test; use rand::{thread_rng, Rng}; use rayon::prelude::*; -use solana::bank::Bank; +use solana::bank::{Bank, MAX_ENTRY_IDS}; use solana::banking_stage::{BankingStage, NUM_THREADS}; use solana::entry::Entry; +use solana::hash::hash; use solana::mint::Mint; use solana::packet::to_packets_chunked; use solana::signature::{KeypairUtil, Signature}; @@ -103,14 +104,23 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { Default::default(), &mint.last_id(), ); + + let mut id = mint.last_id(); + for _ in 0..MAX_ENTRY_IDS { + id = hash(&id.as_ref()); + bank.register_entry_id(&id); + } + bencher.iter(move || { + // make sure the tx last id is still registered + if bank.count_valid_ids(&[mint.last_id()]).len() == 0 { + bank.register_entry_id(&mint.last_id()); + } for v in verified.chunks(verified.len() / NUM_THREADS) { verified_sender.send(v.to_vec()).unwrap(); } check_txs(&signal_receiver, txes); bank.clear_signatures(); - // make sure the tx last id is still registered - bank.register_entry_id(&mint.last_id()); }); } @@ -193,13 +203,22 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { Default::default(), &mint.last_id(), ); + + let mut id = mint.last_id(); + for _ in 0..MAX_ENTRY_IDS { + id = hash(&id.as_ref()); + bank.register_entry_id(&id); + } + bencher.iter(move || { + // make sure the transactions are still valid + if bank.count_valid_ids(&[mint.last_id()]).len() == 0 { + bank.register_entry_id(&mint.last_id()); + } for v in verified.chunks(verified.len() / NUM_THREADS) { verified_sender.send(v.to_vec()).unwrap(); } check_txs(&signal_receiver, txes); bank.clear_signatures(); - // make sure the transactions are still valid - bank.register_entry_id(&mint.last_id()); }); } diff --git a/src/bank.rs b/src/bank.rs index 02c17f86a1..22b66a13b5 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -26,7 +26,7 @@ use signature::Signature; use solana_program_interface::account::{Account, KeyedAccount}; use solana_program_interface::pubkey::Pubkey; use std; -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Mutex, RwLock}; @@ -119,15 +119,32 @@ struct ErrorCounters { duplicate_signature: usize, } -pub struct LastIdsQ { +pub struct LastIds { /// A FIFO queue of `last_id` items, where each item is a set of signatures /// that have been processed using that `last_id`. Rejected `last_id` /// values are so old that the `last_id` has been pulled out of the queue. - last_ids: VecDeque, - /// Mapping of hashes to signature sets along with timestamp. The bank uses this data to - /// reject transactions with signatures its seen before - last_ids_sigs: HashMap, + /// updated whenever an id is registered + nth: isize, + + /// last id to be registered + last: Option, + + /// Mapping of hashes to signature sets along with timestamp and what nth + /// was when the id was added. The bank uses this data to + /// reject transactions with signatures it's seen before and to reject + /// transactions that are too old (nth is too small) + sigs: HashMap, +} + +impl Default for LastIds { + fn default() -> Self { + LastIds { + nth: 0, + last: None, + sigs: HashMap::new(), + } + } } /// The state of all accounts and contracts after processing its entries. @@ -138,8 +155,8 @@ pub struct Bank { /// set of accounts which are currently in the pipeline account_locks: Mutex>, - /// A FIFO queue of `last_id` items - last_ids_q: RwLock, + /// FIFO queue of `last_id` items + last_ids: RwLock, /// The number of transactions the bank has processed without error since the /// start of the ledger. @@ -155,21 +172,12 @@ pub struct Bank { signature_subscriptions: RwLock>>>, } -impl Default for LastIdsQ { - fn default() -> Self { - LastIdsQ { - last_ids: VecDeque::new(), - last_ids_sigs: HashMap::new(), - } - } -} - impl Default for Bank { fn default() -> Self { Bank { accounts: RwLock::new(HashMap::new()), account_locks: Mutex::new(HashSet::new()), - last_ids_q: RwLock::new(LastIdsQ::default()), + last_ids: RwLock::new(LastIds::default()), transaction_count: AtomicUsize::new(0), finality_time: AtomicUsize::new(std::usize::MAX), account_subscriptions: RwLock::new(HashMap::new()), @@ -209,13 +217,11 @@ impl Bank { /// Return the last entry ID registered. pub fn last_id(&self) -> Hash { - let last_ids_q = self.last_ids_q.read().unwrap(); - let last_item = last_ids_q - .last_ids - .iter() - .last() - .expect("get last item from 'last_ids' list"); - *last_item + self.last_ids + .read() + .unwrap() + .last + .expect("no last_id has been set") } /// Store the given signature. The bank will reject any transaction with the same signature. @@ -229,45 +235,39 @@ impl Bank { /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { - for sigs in &mut self.last_ids_q.write().unwrap().last_ids_sigs.values_mut() { + for sigs in &mut self.last_ids.write().unwrap().sigs.values_mut() { sigs.0.clear(); } } - /// Return the position of the last_id in the last_id_queue starting from the back - /// If the last_id is not found last_id_queue.len() is returned - fn compute_entry_id_age(last_id_queue: &VecDeque, entry_id: Hash) -> Option { - for (i, id) in last_id_queue.iter().rev().enumerate() { - if *id == entry_id { - return Some(i); - } - } - None - } /// Check if the age of the entry_id is within the max_age /// return false for any entries with an age equal to or above max_age - fn check_entry_id_age(last_id_queue: &VecDeque, entry_id: Hash, max_age: usize) -> bool { - match Self::compute_entry_id_age(last_id_queue, entry_id) { - Some(age) if age < max_age => true, + fn check_entry_id_age(last_ids: &LastIds, entry_id: Hash, max_age: usize) -> bool { + let entry = last_ids.sigs.get(&entry_id); + + match entry { + Some(entry) => ((last_ids.nth - entry.2) as usize) < max_age, _ => false, } } fn reserve_signature_with_last_id( - last_ids_sigs: &mut HashMap, + last_ids: &mut LastIds, last_id: &Hash, sig: &Signature, ) -> Result<()> { - if let Some(entry) = last_ids_sigs.get_mut(last_id) { - return Self::reserve_signature(&mut entry.0, sig); + if let Some(entry) = last_ids.sigs.get_mut(last_id) { + if ((last_ids.nth - entry.2) as usize) <= MAX_ENTRY_IDS { + return Self::reserve_signature(&mut entry.0, sig); + } } Err(BankError::LastIdNotFound) } #[cfg(test)] fn reserve_signature_with_last_id_test(&self, sig: &Signature, last_id: &Hash) -> Result<()> { - let mut last_ids_q = self.last_ids_q.write().unwrap(); - Self::reserve_signature_with_last_id(&mut last_ids_q.last_ids_sigs, last_id, sig) + let mut last_ids = self.last_ids.write().unwrap(); + Self::reserve_signature_with_last_id(&mut last_ids, last_id, sig) } fn update_signature_status( @@ -280,7 +280,7 @@ impl Bank { } fn update_signature_status_with_last_id( - last_ids_sigs: &mut HashMap, + last_ids_sigs: &mut HashMap, signature: &Signature, result: &Result<()>, last_id: &Hash, @@ -291,10 +291,10 @@ impl Bank { } fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { - let mut last_ids_q = self.last_ids_q.write().unwrap(); + let mut last_ids = self.last_ids.write().unwrap(); for (i, tx) in txs.iter().enumerate() { Self::update_signature_status_with_last_id( - &mut last_ids_q.last_ids_sigs, + &mut last_ids.sigs, &tx.signature, &res[i], &tx.last_id, @@ -320,11 +320,13 @@ impl Bank { /// Return a vec of tuple of (valid index, timestamp) /// index is into the passed ids slice to avoid copying hashes pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> { - let last_ids_q = self.last_ids_q.read().unwrap(); + let last_ids = self.last_ids.read().unwrap(); let mut ret = Vec::new(); for (i, id) in ids.iter().enumerate() { - if let Some(entry) = last_ids_q.last_ids_sigs.get(id) { - ret.push((i, entry.1)); + if let Some(entry) = last_ids.sigs.get(id) { + if ((last_ids.nth - entry.2) as usize) <= MAX_ENTRY_IDS { + ret.push((i, entry.1)); + } } } ret @@ -335,17 +337,26 @@ impl Bank { /// the oldest ones once its internal cache is full. Once boot, the /// bank will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { - let mut last_ids_q = self.last_ids_q.write().unwrap(); - if last_ids_q.last_ids.len() >= MAX_ENTRY_IDS { - let id = last_ids_q.last_ids.pop_front().unwrap(); - debug!("removing last_id {}", id); - last_ids_q.last_ids_sigs.remove(&id); + let mut last_ids = self.last_ids.write().unwrap(); + + let last_ids_nth = last_ids.nth; + + // this clean up can be deferred until sigs gets larger + // because we verify entry.nth every place we check for validity + if last_ids.sigs.len() >= MAX_ENTRY_IDS { + last_ids + .sigs + .retain(|_, (_, _, nth)| ((last_ids_nth - *nth) as usize) <= MAX_ENTRY_IDS); } + + last_ids + .sigs + .insert(*last_id, (HashMap::new(), timestamp(), last_ids_nth)); + + last_ids.nth += 1; + last_ids.last = Some(*last_id); + inc_new_counter_info!("bank-register_entry_id-registered", 1); - last_ids_q - .last_ids_sigs - .insert(*last_id, (HashMap::new(), timestamp())); - last_ids_q.last_ids.push_back(*last_id); } /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. @@ -390,7 +401,7 @@ impl Bank { &self, tx: &Transaction, accounts: &HashMap, - last_ids_q: &mut LastIdsQ, + last_ids: &mut LastIds, max_age: usize, error_counters: &mut ErrorCounters, ) -> Result> { @@ -402,18 +413,14 @@ impl Bank { error_counters.insufficient_funds += 1; Err(BankError::InsufficientFundsForFee) } else { - if !Self::check_entry_id_age(&last_ids_q.last_ids, tx.last_id, max_age) { + if !Self::check_entry_id_age(&last_ids, tx.last_id, max_age) { error_counters.last_id_not_found += 1; return Err(BankError::LastIdNotFound); } // There is no way to predict what contract will execute without an error // If a fee can pay for execution then the contract will be scheduled - let err = Self::reserve_signature_with_last_id( - &mut last_ids_q.last_ids_sigs, - &tx.last_id, - &tx.signature, - ); + let err = Self::reserve_signature_with_last_id(last_ids, &tx.last_id, &tx.signature); if let Err(BankError::LastIdNotFound) = err { error_counters.reserve_last_id += 1; } else if let Err(BankError::DuplicateSignature) = err { @@ -467,12 +474,12 @@ impl Bank { error_counters: &mut ErrorCounters, ) -> Vec<(Result>)> { let accounts = self.accounts.read().unwrap(); - let mut last_ids_q = self.last_ids_q.write().unwrap(); + let mut last_ids = self.last_ids.write().unwrap(); txs.iter() .zip(results.into_iter()) .map(|etx| match etx { (tx, Ok(())) => { - self.load_account(tx, &accounts, &mut last_ids_q, max_age, error_counters) + self.load_account(tx, &accounts, &mut last_ids, max_age, error_counters) } (_, Err(e)) => Err(e), }).collect() @@ -1125,8 +1132,8 @@ impl Bank { } pub fn get_signature_status(&self, signature: &Signature) -> Result<()> { - let last_ids_q = self.last_ids_q.read().unwrap(); - for (signatures, _) in last_ids_q.last_ids_sigs.values() { + let last_ids = self.last_ids.read().unwrap(); + for (signatures, _, _) in last_ids.sigs.values() { if let Some(res) = signatures.get(signature) { return res.clone(); } @@ -1139,10 +1146,10 @@ impl Bank { } pub fn get_signature(&self, last_id: &Hash, signature: &Signature) -> Option> { - self.last_ids_q + self.last_ids .read() .unwrap() - .last_ids_sigs + .sigs .get(last_id) .and_then(|sigs| sigs.0.get(signature).cloned()) } @@ -1909,35 +1916,6 @@ mod tests { ); } #[test] - fn test_entry_id_age() { - let mut q = VecDeque::new(); - let hash1 = Hash::default(); - let hash2 = hash(hash1.as_ref()); - let hash3 = hash(hash2.as_ref()); - assert_eq!(Bank::compute_entry_id_age(&q, hash1), None); - q.push_back(hash1); - assert_eq!(Bank::compute_entry_id_age(&q, hash1), Some(0)); - q.push_back(hash2); - assert_eq!(Bank::compute_entry_id_age(&q, hash1), Some(1)); - assert_eq!(Bank::compute_entry_id_age(&q, hash2), Some(0)); - assert_eq!(Bank::compute_entry_id_age(&q, hash3), None); - - // all are below 2 - assert_eq!(Bank::check_entry_id_age(&q, hash2, 2), true); - assert_eq!(Bank::check_entry_id_age(&q, hash1, 2), true); - - // hash2 is most recent with age 0, max is 1, anything equal to max or above is rejected - assert_eq!(Bank::check_entry_id_age(&q, hash2, 1), true); - assert_eq!(Bank::check_entry_id_age(&q, hash1, 1), false); - - // max_age 0 is always rejected - assert_eq!(Bank::check_entry_id_age(&q, hash1, 0), false); - assert_eq!(Bank::check_entry_id_age(&q, hash2, 0), false); - - // hash3 is not in the q - assert_eq!(Bank::check_entry_id_age(&q, hash3, 3), false); - } - #[test] fn test_first_err() { assert_eq!(Bank::first_err(&[Ok(())]), Ok(())); assert_eq!(