last_ids opt

This commit is contained in:
Stephen Akridge 2018-06-22 10:44:31 -07:00 committed by Greg Fitzgerald
parent e83d76fbd9
commit 00516e50a1
3 changed files with 47 additions and 22 deletions

View File

@ -20,6 +20,7 @@ use std::sync::RwLock;
use transaction::{Instruction, Plan, Transaction};
use std::time::Instant;
use std::cell::Cell;
use timing::duration_as_us;
/// The number of most recent `last_id` values that the bank will track the signatures
/// of. Once the bank discards a `last_id`, it will reject any transactions that use
@ -73,7 +74,9 @@ pub struct Bank {
/// that have been processed using that `last_id`. The bank uses this data to
/// reject transactions with signatures its seen before as well as `last_id`
/// values that are so old that its `last_id` has been pulled out of the queue.
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
last_ids: RwLock<VecDeque<Hash>>,
last_ids_sigs: RwLock<HashMap<Hash, RwLock<HashSet<Signature>>>>,
/// The set of trusted timekeepers. A Timestamp transaction from a `PublicKey`
/// outside this set will be discarded. Note that if validators do not have the
@ -97,6 +100,7 @@ impl Bank {
balances: RwLock::new(HashMap::new()),
pending: RwLock::new(HashMap::new()),
last_ids: RwLock::new(VecDeque::new()),
last_ids_sigs: RwLock::new(HashMap::new()),
time_sources: RwLock::new(HashSet::new()),
last_time: RwLock::new(Utc.timestamp(0, 0)),
transaction_count: AtomicUsize::new(0),
@ -159,12 +163,19 @@ impl Bank {
pub fn last_id(&self) -> Hash {
let last_ids = self.last_ids.read().expect("'last_ids' read lock");
let last_item = last_ids.iter().last().expect("empty 'last_ids' list");
last_item.0
*last_item
}
/// Store the given signature. The bank will reject any transaction with the same signature.
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> Result<()> {
if signatures
let mut sigs_l = signatures.write().unwrap();
if let Some(sig) = sigs_l.get(sig) {
return Err(BankError::DuplicateSiganture(*sig));
}
{
sigs_l.insert(*sig);
}
/*if signatures
.read()
.expect("'signatures' read lock")
.contains(sig)
@ -174,7 +185,7 @@ impl Bank {
signatures
.write()
.expect("'signatures' write lock")
.insert(*sig);
.insert(*sig);*/
Ok(())
}
@ -188,26 +199,23 @@ impl Bank {
/// Forget the given `signature` with `last_id` because the transaction was rejected.
fn forget_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) {
if let Some(entry) = self.last_ids
if let Some(entry) = self.last_ids_sigs
.read()
.expect("'last_ids' read lock in forget_signature_with_last_id")
.iter()
.rev()
.find(|x| x.0 == *last_id)
.get(last_id)
{
Self::forget_signature(&entry.1, signature);
Self::forget_signature(entry, signature);
}
}
fn reserve_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) -> Result<()> {
if let Some(entry) = self.last_ids
if let Some(entry) = self.last_ids_sigs
.read()
.expect("'last_ids' read lock in reserve_signature_with_last_id")
.iter()
.rev()
.find(|x| x.0 == *last_id)
.get(last_id)
{
return Self::reserve_signature(&entry.1, signature);
return Self::reserve_signature(&entry, signature);
//return Ok(());
}
Err(BankError::LastIdNotFound(*last_id))
}
@ -220,10 +228,15 @@ impl Bank {
let mut last_ids = self.last_ids
.write()
.expect("'last_ids' write lock in register_entry_id");
let mut last_ids_sigs = self.last_ids_sigs
.write()
.expect("last_ids_sigs write lock");
if last_ids.len() >= MAX_ENTRY_IDS {
last_ids.pop_front();
let id = last_ids.pop_front().unwrap();
last_ids_sigs.remove(&id);
}
last_ids.push_back((*last_id, RwLock::new(HashSet::new())));
last_ids_sigs.insert(*last_id, RwLock::new(HashSet::new()));
last_ids.push_back(*last_id);
}
pub fn apply_debits(&self, tr: &Transaction, bals: &mut HashMap<PublicKey, i64>) -> Result<()> {
@ -348,13 +361,15 @@ impl Bank {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
let bals = &mut self.balances.write().unwrap();
info!("processing Transactions {}", txs.len());
debug!("processing Transactions {}", txs.len());
let txs_len = txs.len();
let now = Instant::now();
let results: Vec<_> = txs.into_iter()
.map(|tx| self.apply_debits(&tx, bals).map(|_| tx))
.collect(); // Calling collect() here forces all debits to complete before moving on.
info!("debits: {:?}", now.elapsed());
let debits = now.elapsed();
let now = Instant::now();
let res: Vec<_> = results
.into_iter()
@ -365,14 +380,20 @@ impl Bank {
})
})
.collect();
info!("debits: {} us credits: {:?} us tx: {}",
duration_as_us(&debits), duration_as_us(&now.elapsed()), txs_len);
let mut tr_count = 0;
for r in &res {
if r.is_ok() {
tr_count += 1;
} else {
info!("tx error: {:?}", r);
}
}
//let tr_count = txs.len();
self.transaction_count.fetch_add(tr_count, Ordering::Relaxed);
info!("credits: {:?}", now.elapsed());
//debug!("credits: {:?}", now.elapsed());
//let res = txs.into_iter().map(|tx| Ok(tx)).collect();
res
}
@ -780,8 +801,8 @@ mod bench {
.collect();
bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures.
for sigs in bank.last_ids.read().unwrap().iter() {
sigs.1.write().unwrap().clear();
for (_, sigs) in bank.last_ids_sigs.write().unwrap().iter_mut() {
sigs.clear();
}
assert!(

View File

@ -327,7 +327,7 @@ mod bench {
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
let packet_recycler = PacketRecycler::default();
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions, tx)
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions, 192)
.into_iter()
.map(|x| {
let len = (*x).read().unwrap().packets.len();

View File

@ -2,6 +2,10 @@
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
pub fn duration_as_us(d: &Duration) -> u64 {
return (d.as_secs() * 1000 * 1000) + (d.subsec_nanos() as u64 / 1_000);
}
pub fn duration_as_ms(d: &Duration) -> u64 {
return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000);
}