From 491a530d90a976f68d751688a43efdff0a2101d4 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Apr 2018 22:13:54 -0600 Subject: [PATCH] Support parallelization of arbitrary transactions Still assumes witnesses are processed serially afterward. --- src/accountant.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index b031d4389..5f599bf93 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -10,6 +10,7 @@ use event::Event; use hash::Hash; use mint::Mint; use plan::{Payment, Plan, Witness}; +use rayon::prelude::*; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; @@ -150,6 +151,16 @@ impl Accountant { Ok(()) } + /// Process a batch of verified transactions. + pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec> { + // Run all debits first to filter out any transactions that can't be processed + // in parallel deterministically. + trs.par_iter() + .map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr)) + .map(|result| result.map(|tr| self.process_verified_transaction_credits(tr))) + .collect() + } + /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { @@ -401,7 +412,6 @@ mod bench { extern crate test; use self::test::Bencher; use accountant::*; - use rayon::prelude::*; use signature::KeyPairUtil; use hash::hash; use bincode::serialize; @@ -437,9 +447,11 @@ mod bench { sigs.write().unwrap().clear(); } - transactions.par_iter().for_each(|tr| { - acc.process_verified_transaction(tr).unwrap(); - }); + assert!( + acc.process_verified_transactions(&transactions) + .iter() + .all(|x| x.is_ok()) + ); }); } }