From 00516e50a148da86ceb623d736fda8a07ab2e463 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 22 Jun 2018 10:44:31 -0700 Subject: [PATCH] last_ids opt --- src/bank.rs | 63 +++++++++++++++++++++++++++++--------------- src/banking_stage.rs | 2 +- src/timing.rs | 4 +++ 3 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 164d75c8e5..4c1d43a20e 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -20,6 +20,7 @@ use std::sync::RwLock; use transaction::{Instruction, Plan, Transaction}; use std::time::Instant; use std::cell::Cell; +use timing::duration_as_us; /// The number of most recent `last_id` values that the bank will track the signatures /// of. Once the bank discards a `last_id`, it will reject any transactions that use @@ -73,7 +74,9 @@ pub struct Bank { /// that have been processed using that `last_id`. The bank uses this data to /// reject transactions with signatures its seen before as well as `last_id` /// values that are so old that its `last_id` has been pulled out of the queue. - last_ids: RwLock>)>>, + last_ids: RwLock>, + + last_ids_sigs: RwLock>>>, /// The set of trusted timekeepers. A Timestamp transaction from a `PublicKey` /// outside this set will be discarded. Note that if validators do not have the @@ -97,6 +100,7 @@ impl Bank { balances: RwLock::new(HashMap::new()), pending: RwLock::new(HashMap::new()), last_ids: RwLock::new(VecDeque::new()), + last_ids_sigs: RwLock::new(HashMap::new()), time_sources: RwLock::new(HashSet::new()), last_time: RwLock::new(Utc.timestamp(0, 0)), transaction_count: AtomicUsize::new(0), @@ -159,12 +163,19 @@ impl Bank { pub fn last_id(&self) -> Hash { let last_ids = self.last_ids.read().expect("'last_ids' read lock"); let last_item = last_ids.iter().last().expect("empty 'last_ids' list"); - last_item.0 + *last_item } /// Store the given signature. The bank will reject any transaction with the same signature. fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> Result<()> { - if signatures + let mut sigs_l = signatures.write().unwrap(); + if let Some(sig) = sigs_l.get(sig) { + return Err(BankError::DuplicateSiganture(*sig)); + } + { + sigs_l.insert(*sig); + } + /*if signatures .read() .expect("'signatures' read lock") .contains(sig) @@ -174,7 +185,7 @@ impl Bank { signatures .write() .expect("'signatures' write lock") - .insert(*sig); + .insert(*sig);*/ Ok(()) } @@ -188,26 +199,23 @@ impl Bank { /// Forget the given `signature` with `last_id` because the transaction was rejected. fn forget_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) { - if let Some(entry) = self.last_ids + if let Some(entry) = self.last_ids_sigs .read() .expect("'last_ids' read lock in forget_signature_with_last_id") - .iter() - .rev() - .find(|x| x.0 == *last_id) + .get(last_id) { - Self::forget_signature(&entry.1, signature); + Self::forget_signature(entry, signature); } } fn reserve_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) -> Result<()> { - if let Some(entry) = self.last_ids + if let Some(entry) = self.last_ids_sigs .read() .expect("'last_ids' read lock in reserve_signature_with_last_id") - .iter() - .rev() - .find(|x| x.0 == *last_id) + .get(last_id) { - return Self::reserve_signature(&entry.1, signature); + return Self::reserve_signature(&entry, signature); + //return Ok(()); } Err(BankError::LastIdNotFound(*last_id)) } @@ -220,10 +228,15 @@ impl Bank { let mut last_ids = self.last_ids .write() .expect("'last_ids' write lock in register_entry_id"); + let mut last_ids_sigs = self.last_ids_sigs + .write() + .expect("last_ids_sigs write lock"); if last_ids.len() >= MAX_ENTRY_IDS { - last_ids.pop_front(); + let id = last_ids.pop_front().unwrap(); + last_ids_sigs.remove(&id); } - last_ids.push_back((*last_id, RwLock::new(HashSet::new()))); + last_ids_sigs.insert(*last_id, RwLock::new(HashSet::new())); + last_ids.push_back(*last_id); } pub fn apply_debits(&self, tr: &Transaction, bals: &mut HashMap) -> Result<()> { @@ -348,13 +361,15 @@ impl Bank { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. let bals = &mut self.balances.write().unwrap(); - info!("processing Transactions {}", txs.len()); + debug!("processing Transactions {}", txs.len()); + let txs_len = txs.len(); let now = Instant::now(); let results: Vec<_> = txs.into_iter() .map(|tx| self.apply_debits(&tx, bals).map(|_| tx)) .collect(); // Calling collect() here forces all debits to complete before moving on. - info!("debits: {:?}", now.elapsed()); + let debits = now.elapsed(); + let now = Instant::now(); let res: Vec<_> = results .into_iter() @@ -365,14 +380,20 @@ impl Bank { }) }) .collect(); + info!("debits: {} us credits: {:?} us tx: {}", + duration_as_us(&debits), duration_as_us(&now.elapsed()), txs_len); let mut tr_count = 0; for r in &res { if r.is_ok() { tr_count += 1; + } else { + info!("tx error: {:?}", r); } } + //let tr_count = txs.len(); self.transaction_count.fetch_add(tr_count, Ordering::Relaxed); - info!("credits: {:?}", now.elapsed()); + //debug!("credits: {:?}", now.elapsed()); + //let res = txs.into_iter().map(|tx| Ok(tx)).collect(); res } @@ -780,8 +801,8 @@ mod bench { .collect(); bencher.iter(|| { // Since benchmarker runs this multiple times, we need to clear the signatures. - for sigs in bank.last_ids.read().unwrap().iter() { - sigs.1.write().unwrap().clear(); + for (_, sigs) in bank.last_ids_sigs.write().unwrap().iter_mut() { + sigs.clear(); } assert!( diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 18ee0c63fb..ddf47dbb57 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -327,7 +327,7 @@ mod bench { let (verified_sender, verified_receiver) = channel(); let (signal_sender, signal_receiver) = channel(); let packet_recycler = PacketRecycler::default(); - let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions, tx) + let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions, 192) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); diff --git a/src/timing.rs b/src/timing.rs index 0adcf91cb9..3ae7bf8c78 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -2,6 +2,10 @@ use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; +pub fn duration_as_us(d: &Duration) -> u64 { + return (d.as_secs() * 1000 * 1000) + (d.subsec_nanos() as u64 / 1_000); +} + pub fn duration_as_ms(d: &Duration) -> u64 { return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000); }