From 0c60fdd2ce6d45626f333b7311aa74a9a8bac4f9 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 4 Apr 2018 12:33:00 -0600 Subject: [PATCH 1/6] Make accountant thread-safe Before this change, parallel transaction processing required locking the full accountant. Since we only call one method, process_verified_transaction, the global lock equates to doing no parallelization at all. With this change, we only lock the data that's being written to. --- src/accountant.rs | 89 ++++++++++++++++++++++++--------------------- src/bin/testnode.rs | 2 +- 2 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 9ed88571ce..8b4b13931c 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -12,6 +12,7 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet}; use std::result; +use std::sync::RwLock; use transaction::Transaction; #[derive(Debug, PartialEq, Eq)] @@ -28,11 +29,11 @@ fn apply_payment(balances: &mut HashMap, payment: &Payment) { } pub struct Accountant { - balances: HashMap, - pending: HashMap, - signatures: HashSet, - time_sources: HashSet, - last_time: DateTime, + balances: RwLock>, + pending: RwLock>, + signatures: RwLock>, + time_sources: RwLock>, + last_time: RwLock>, } impl Accountant { @@ -41,11 +42,11 @@ impl Accountant { let mut balances = HashMap::new(); apply_payment(&mut balances, deposit); Accountant { - balances, - pending: HashMap::new(), - signatures: HashSet::new(), - time_sources: HashSet::new(), - last_time: Utc.timestamp(0, 0), + balances: RwLock::new(balances), + pending: RwLock::new(HashMap::new()), + signatures: RwLock::new(HashSet::new()), + time_sources: RwLock::new(HashSet::new()), + last_time: RwLock::new(Utc.timestamp(0, 0)), } } @@ -58,16 +59,16 @@ impl Accountant { Self::new_from_deposit(&deposit) } - fn reserve_signature(&mut self, sig: &Signature) -> bool { - if self.signatures.contains(sig) { + fn reserve_signature(&self, sig: &Signature) -> bool { + if self.signatures.read().unwrap().contains(sig) { return false; } - self.signatures.insert(*sig); + self.signatures.write().unwrap().insert(*sig); true } /// Process a Transaction that has already been verified. - pub fn process_verified_transaction(&mut self, tr: &Transaction) -> Result<()> { + pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> { if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens { return Err(AccountingError::InsufficientFunds); } @@ -76,28 +77,28 @@ impl Accountant { return Err(AccountingError::InvalidTransferSignature); } - if let Some(x) = self.balances.get_mut(&tr.from) { + if let Some(x) = self.balances.write().unwrap().get_mut(&tr.from) { *x -= tr.tokens; } let mut plan = tr.plan.clone(); - plan.apply_witness(&Witness::Timestamp(self.last_time)); + plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); if let Some(ref payment) = plan.final_payment() { - apply_payment(&mut self.balances, payment); + apply_payment(&mut self.balances.write().unwrap(), payment); } else { - self.pending.insert(tr.sig, plan); + self.pending.write().unwrap().insert(tr.sig, plan); } Ok(()) } /// Process a Witness Signature that has already been verified. - fn process_verified_sig(&mut self, from: PublicKey, tx_sig: Signature) -> Result<()> { - if let Occupied(mut e) = self.pending.entry(tx_sig) { + fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { + if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(ref payment) = e.get().final_payment() { - apply_payment(&mut self.balances, payment); + apply_payment(&mut self.balances.write().unwrap(), payment); e.remove_entry(); } }; @@ -106,16 +107,16 @@ impl Accountant { } /// Process a Witness Timestamp that has already been verified. - fn process_verified_timestamp(&mut self, from: PublicKey, dt: DateTime) -> Result<()> { + fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime) -> Result<()> { // If this is the first timestamp we've seen, it probably came from the genesis block, // so we'll trust it. - if self.last_time == Utc.timestamp(0, 0) { - self.time_sources.insert(from); + if *self.last_time.read().unwrap() == Utc.timestamp(0, 0) { + self.time_sources.write().unwrap().insert(from); } - if self.time_sources.contains(&from) { - if dt > self.last_time { - self.last_time = dt; + if self.time_sources.read().unwrap().contains(&from) { + if dt > *self.last_time.read().unwrap() { + *self.last_time.write().unwrap() = dt; } } else { return Ok(()); @@ -123,23 +124,27 @@ impl Accountant { // Check to see if any timelocked transactions can be completed. let mut completed = vec![]; - for (key, plan) in &mut self.pending { - plan.apply_witness(&Witness::Timestamp(self.last_time)); + + // Hold 'pending' write lock until the end of this function. Otherwise another thread can + // double-spend if it enters before the modified plan is removed from 'pending'. + let mut pending = self.pending.write().unwrap(); + for (key, plan) in pending.iter_mut() { + plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); if let Some(ref payment) = plan.final_payment() { - apply_payment(&mut self.balances, payment); + apply_payment(&mut self.balances.write().unwrap(), payment); completed.push(key.clone()); } } for key in completed { - self.pending.remove(&key); + pending.remove(&key); } Ok(()) } /// Process an Transaction or Witness that has already been verified. - pub fn process_verified_event(&mut self, event: &Event) -> Result<()> { + pub fn process_verified_event(&self, event: &Event) -> Result<()> { match *event { Event::Transaction(ref tr) => self.process_verified_transaction(tr), Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig), @@ -150,7 +155,7 @@ impl Accountant { /// Create, sign, and process a Transaction from `keypair` to `to` of /// `n` tokens where `last_id` is the last Entry ID observed by the client. pub fn transfer( - &mut self, + &self, n: i64, keypair: &KeyPair, to: PublicKey, @@ -165,7 +170,7 @@ impl Accountant { /// to `to` of `n` tokens on `dt` where `last_id` is the last Entry ID /// observed by the client. pub fn transfer_on_date( - &mut self, + &self, n: i64, keypair: &KeyPair, to: PublicKey, @@ -178,7 +183,7 @@ impl Accountant { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - self.balances.get(pubkey).cloned() + self.balances.read().unwrap().get(pubkey).cloned() } } @@ -191,7 +196,7 @@ mod tests { fn test_accountant() { let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); - let mut acc = Accountant::new(&alice); + let acc = Accountant::new(&alice); acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); @@ -204,7 +209,7 @@ mod tests { #[test] fn test_invalid_transfer() { let alice = Mint::new(11_000); - let mut acc = Accountant::new(&alice); + let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); @@ -221,7 +226,7 @@ mod tests { #[test] fn test_transfer_to_newb() { let alice = Mint::new(10_000); - let mut acc = Accountant::new(&alice); + let acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); acc.transfer(500, &alice_keypair, bob_pubkey, alice.last_id()) @@ -232,7 +237,7 @@ mod tests { #[test] fn test_transfer_on_date() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice); + let acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); @@ -258,7 +263,7 @@ mod tests { #[test] fn test_transfer_after_date() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice); + let acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); @@ -275,7 +280,7 @@ mod tests { #[test] fn test_cancel_transfer() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice); + let acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); @@ -301,7 +306,7 @@ mod tests { #[test] fn test_duplicate_event_signature() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice); + let acc = Accountant::new(&alice); let sig = Signature::default(); assert!(acc.reserve_signature(&sig)); assert!(!acc.reserve_signature(&sig)); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 40047a9ffd..bbce2d91ee 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -32,7 +32,7 @@ fn main() { None }; - let mut acc = Accountant::new_from_deposit(&deposit.unwrap()); + let acc = Accountant::new_from_deposit(&deposit.unwrap()); let mut last_id = entry1.id; for entry in entries { From 014bdaa355052c1c3790626e299e710b91255e76 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 4 Apr 2018 10:28:12 -0600 Subject: [PATCH 2/6] Add benchmark for parallel transaction processing --- src/accountant.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/accountant.rs b/src/accountant.rs index 8b4b13931c..ec8dcc5f40 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -312,3 +312,32 @@ mod tests { assert!(!acc.reserve_signature(&sig)); } } + +#[cfg(all(feature = "unstable", test))] +mod bench { + extern crate test; + use self::test::Bencher; + use accountant::*; + use rayon::prelude::*; + use signature::KeyPairUtil; + + #[bench] + fn process_verified_event_bench(bencher: &mut Bencher) { + let alice = Mint::new(100_000_000); + let alice_keypair = alice.keypair(); + let last_id = Hash::default(); + let transactions: Vec<_> = (0..4096) + .into_par_iter() + .map(|_| { + let rando_pubkey = KeyPair::new().pubkey(); + Transaction::new(&alice_keypair, rando_pubkey, 1, last_id) + }) + .collect(); + bencher.iter(|| { + let acc = Accountant::new(&alice); + transactions.par_iter().for_each(move |tr| { + acc.process_verified_transaction(tr).unwrap(); + }); + }); + } +} From dc2ec925d7431ea580cf1c1cf4c974fbd0a5ade4 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 4 Apr 2018 16:01:43 -0600 Subject: [PATCH 3/6] Better test --- src/accountant.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index ec8dcc5f40..72d5c90db8 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -323,19 +323,22 @@ mod bench { #[bench] fn process_verified_event_bench(bencher: &mut Bencher) { - let alice = Mint::new(100_000_000); - let alice_keypair = alice.keypair(); - let last_id = Hash::default(); + let mint = Mint::new(100_000_000); + let acc = Accountant::new(&mint); + // Create transactions between unrelated parties. let transactions: Vec<_> = (0..4096) .into_par_iter() .map(|_| { - let rando_pubkey = KeyPair::new().pubkey(); - Transaction::new(&alice_keypair, rando_pubkey, 1, last_id) + let rando0 = KeyPair::new(); + let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id()); + acc.process_verified_transaction(&tr).unwrap(); + let rando1 = KeyPair::new(); + Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id()) }) .collect(); bencher.iter(|| { - let acc = Accountant::new(&alice); - transactions.par_iter().for_each(move |tr| { + acc.signatures.write().unwrap().clear(); + transactions.par_iter().for_each(|tr| { acc.process_verified_transaction(tr).unwrap(); }); }); From 76679ffb92434d95b366c4df3a399d2b708f7e1f Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 4 Apr 2018 16:31:11 -0600 Subject: [PATCH 4/6] Per-cell locking This allows us to use read-locks for balances most of the time. We only lock the full table if we need to add one. --- src/accountant.rs | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 72d5c90db8..efa56ca1be 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -24,12 +24,18 @@ pub enum AccountingError { pub type Result = result::Result; /// Commit funds to the 'to' party. -fn apply_payment(balances: &mut HashMap, payment: &Payment) { - *balances.entry(payment.to).or_insert(0) += payment.tokens; +fn apply_payment(balances: &RwLock>>, payment: &Payment) { + if balances.read().unwrap().contains_key(&payment.to) { + let bals = balances.read().unwrap(); + *bals[&payment.to].write().unwrap() += payment.tokens; + } else { + let mut bals = balances.write().unwrap(); + bals.insert(payment.to, RwLock::new(payment.tokens)); + } } pub struct Accountant { - balances: RwLock>, + balances: RwLock>>, pending: RwLock>, signatures: RwLock>, time_sources: RwLock>, @@ -39,10 +45,10 @@ pub struct Accountant { impl Accountant { /// Create an Accountant using a deposit. pub fn new_from_deposit(deposit: &Payment) -> Self { - let mut balances = HashMap::new(); - apply_payment(&mut balances, deposit); + let balances = RwLock::new(HashMap::new()); + apply_payment(&balances, deposit); Accountant { - balances: RwLock::new(balances), + balances, pending: RwLock::new(HashMap::new()), signatures: RwLock::new(HashSet::new()), time_sources: RwLock::new(HashSet::new()), @@ -77,15 +83,15 @@ impl Accountant { return Err(AccountingError::InvalidTransferSignature); } - if let Some(x) = self.balances.write().unwrap().get_mut(&tr.from) { - *x -= tr.tokens; + if let Some(x) = self.balances.read().unwrap().get(&tr.from) { + *x.write().unwrap() -= tr.tokens; } let mut plan = tr.plan.clone(); plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); if let Some(ref payment) = plan.final_payment() { - apply_payment(&mut self.balances.write().unwrap(), payment); + apply_payment(&self.balances, payment); } else { self.pending.write().unwrap().insert(tr.sig, plan); } @@ -98,7 +104,7 @@ impl Accountant { if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(ref payment) = e.get().final_payment() { - apply_payment(&mut self.balances.write().unwrap(), payment); + apply_payment(&self.balances, payment); e.remove_entry(); } }; @@ -131,7 +137,7 @@ impl Accountant { for (key, plan) in pending.iter_mut() { plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); if let Some(ref payment) = plan.final_payment() { - apply_payment(&mut self.balances.write().unwrap(), payment); + apply_payment(&self.balances, payment); completed.push(key.clone()); } } @@ -183,7 +189,8 @@ impl Accountant { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - self.balances.read().unwrap().get(pubkey).cloned() + let bals = self.balances.read().unwrap(); + bals.get(pubkey).map(|x| *x.read().unwrap()) } } @@ -329,15 +336,24 @@ mod bench { let transactions: Vec<_> = (0..4096) .into_par_iter() .map(|_| { + // Seed the 'from' account. let rando0 = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id()); acc.process_verified_transaction(&tr).unwrap(); + + // Seed the 'to' account. let rando1 = KeyPair::new(); + let tr = Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id()); + acc.process_verified_transaction(&tr).unwrap(); + + // Finally, return a transaction that's unique Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id()) }) .collect(); bencher.iter(|| { + // Since benchmarker runs this multiple times, we need to clear the signatures. acc.signatures.write().unwrap().clear(); + transactions.par_iter().for_each(|tr| { acc.process_verified_transaction(tr).unwrap(); }); From 3cfb07ea389866d090819b5df4d80ab0bde4ad83 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 4 Apr 2018 17:03:18 -0600 Subject: [PATCH 5/6] Sort signatures by last_id This will allow for additional concurrency as well as give the server a means of garbage-collecting old signatures. --- src/accountant.rs | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index efa56ca1be..b4bbeb1e5b 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -10,7 +10,7 @@ use mint::Mint; use plan::{Payment, Plan, Witness}; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::result; use std::sync::RwLock; use transaction::Transaction; @@ -37,7 +37,7 @@ fn apply_payment(balances: &RwLock>>, payment: &P pub struct Accountant { balances: RwLock>>, pending: RwLock>, - signatures: RwLock>, + last_ids: RwLock>)>>, time_sources: RwLock>, last_time: RwLock>, } @@ -50,7 +50,7 @@ impl Accountant { Accountant { balances, pending: RwLock::new(HashMap::new()), - signatures: RwLock::new(HashSet::new()), + last_ids: RwLock::new(VecDeque::new()), time_sources: RwLock::new(HashSet::new()), last_time: RwLock::new(Utc.timestamp(0, 0)), } @@ -65,11 +65,27 @@ impl Accountant { Self::new_from_deposit(&deposit) } - fn reserve_signature(&self, sig: &Signature) -> bool { - if self.signatures.read().unwrap().contains(sig) { + fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { + if signatures.read().unwrap().contains(sig) { return false; } - self.signatures.write().unwrap().insert(*sig); + signatures.write().unwrap().insert(*sig); + true + } + + fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { + if let Some(entry) = self.last_ids + .read() + .unwrap() + .iter() + .rev() + .find(|x| x.0 == *last_id) + { + return Self::reserve_signature(&entry.1, sig); + } + let sigs = RwLock::new(HashSet::new()); + Self::reserve_signature(&sigs, sig); + self.last_ids.write().unwrap().push_back((*last_id, sigs)); true } @@ -79,7 +95,7 @@ impl Accountant { return Err(AccountingError::InsufficientFunds); } - if !self.reserve_signature(&tr.sig) { + if !self.reserve_signature_with_last_id(&tr.sig, &tr.last_id) { return Err(AccountingError::InvalidTransferSignature); } @@ -315,8 +331,9 @@ mod tests { let alice = Mint::new(1); let acc = Accountant::new(&alice); let sig = Signature::default(); - assert!(acc.reserve_signature(&sig)); - assert!(!acc.reserve_signature(&sig)); + let last_id = Hash::default(); + assert!(acc.reserve_signature_with_last_id(&sig, &last_id)); + assert!(!acc.reserve_signature_with_last_id(&sig, &last_id)); } } @@ -352,7 +369,7 @@ mod bench { .collect(); bencher.iter(|| { // Since benchmarker runs this multiple times, we need to clear the signatures. - acc.signatures.write().unwrap().clear(); + acc.last_ids.write().unwrap().clear(); transactions.par_iter().for_each(|tr| { acc.process_verified_transaction(tr).unwrap(); From 8d425e127bf5b752ffc5a124f7a05fb9495b6ce8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 4 Apr 2018 17:29:22 -0600 Subject: [PATCH 6/6] Update benchmark to avoid write locks in sig duplicate detection --- src/accountant.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index b4bbeb1e5b..1b1b281788 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -344,6 +344,8 @@ mod bench { use accountant::*; use rayon::prelude::*; use signature::KeyPairUtil; + use hash::hash; + use bincode::serialize; #[bench] fn process_verified_event_bench(bencher: &mut Bencher) { @@ -352,24 +354,27 @@ mod bench { // Create transactions between unrelated parties. let transactions: Vec<_> = (0..4096) .into_par_iter() - .map(|_| { + .map(|i| { // Seed the 'from' account. let rando0 = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id()); acc.process_verified_transaction(&tr).unwrap(); - // Seed the 'to' account. + // Seed the 'to' account and a cell for its signature. + let last_id = hash(&serialize(&i).unwrap()); // Unique hash let rando1 = KeyPair::new(); - let tr = Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id()); + let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id); acc.process_verified_transaction(&tr).unwrap(); // Finally, return a transaction that's unique - Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id()) + Transaction::new(&rando0, rando1.pubkey(), 1, last_id) }) .collect(); bencher.iter(|| { // Since benchmarker runs this multiple times, we need to clear the signatures. - acc.last_ids.write().unwrap().clear(); + for (_, sigs) in acc.last_ids.read().unwrap().iter() { + sigs.write().unwrap().clear(); + } transactions.par_iter().for_each(|tr| { acc.process_verified_transaction(tr).unwrap();