diff --git a/src/accountant.rs b/src/accountant.rs index 01c59f308..8fb576bf8 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -18,7 +18,7 @@ use std::result; use std::sync::RwLock; use transaction::Transaction; -const MAX_ENTRY_IDS: usize = 1024 * 4; +pub const MAX_ENTRY_IDS: usize = 1024 * 4; #[derive(Debug, PartialEq, Eq)] pub enum AccountingError { @@ -152,12 +152,17 @@ impl Accountant { } /// Process a batch of verified transactions. - pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec> { + pub fn process_verified_transactions(&self, trs: Vec) -> Vec> { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. - trs.par_iter() - .map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr)) - .map(|result| result.map(|tr| self.process_verified_transaction_credits(tr))) + trs.into_par_iter() + .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) + .map(|result| { + result.map(|tr| { + self.process_verified_transaction_credits(&tr); + tr + }) + }) .collect() } @@ -448,7 +453,7 @@ mod bench { } assert!( - acc.process_verified_transactions(&transactions) + acc.process_verified_transactions(transactions.clone()) .iter() .all(|x| x.is_ok()) ); diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 47632cd2e..814736663 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -21,7 +21,7 @@ use std::collections::VecDeque; use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, SendError, Sender}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -82,28 +82,18 @@ impl AccountantSkel { } /// Process Request items sent by clients. - pub fn log_verified_request(&mut self, msg: Request, verify: u8) -> Option { + pub fn process_request( + &mut self, + msg: Request, + rsp_addr: SocketAddr, + ) -> Option<(Response, SocketAddr)> { match msg { - Request::Transaction(_) if verify == 0 => { - trace!("Transaction failed sigverify"); - None - } - Request::Transaction(tr) => { - if let Err(err) = self.acc.process_verified_transaction(&tr) { - trace!("Transaction error: {:?}", err); - } else if let Err(SendError(_)) = self.historian - .sender - .send(Signal::Event(Event::Transaction(tr.clone()))) - { - error!("Channel send error"); - } - None - } Request::GetBalance { key } => { let val = self.acc.get_balance(&key); - Some(Response::Balance { key, val }) + Some((Response::Balance { key, val }, rsp_addr)) } - Request::GetLastId => Some(Response::LastId { id: self.sync() }), + Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)), + Request::Transaction(_) => unreachable!(), } } @@ -155,18 +145,46 @@ impl AccountantSkel { .collect() } - fn process_packets( - obj: &Arc>>, + /// Split Request list into verified transactions and the rest + fn partition_requests( req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> Vec<(Response, SocketAddr)> { - req_vers - .into_iter() - .filter_map(|(req, rsp_addr, v)| { - let mut skel = obj.lock().unwrap(); - skel.log_verified_request(req, v) - .map(|resp| (resp, rsp_addr)) - }) - .collect() + ) -> (Vec, Vec<(Request, SocketAddr)>) { + let mut trs = vec![]; + let mut reqs = vec![]; + for (msg, rsp_addr, verify) in req_vers { + match msg { + Request::Transaction(tr) => { + if verify != 0 { + trs.push(tr); + } + } + _ => reqs.push((msg, rsp_addr)), + } + } + (trs, reqs) + } + + fn process_packets( + &mut self, + req_vers: Vec<(Request, SocketAddr, u8)>, + ) -> Result> { + let (trs, reqs) = Self::partition_requests(req_vers); + + // Process the transactions in parallel and then log the successful ones. + for result in self.acc.process_verified_transactions(trs) { + if let Ok(tr) = result { + self.historian + .sender + .send(Signal::Event(Event::Transaction(tr)))?; + } + } + + // Process the remaining requests serially. + let rsps = reqs.into_iter() + .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) + .collect(); + + Ok(rsps) } fn serialize_response( @@ -213,7 +231,7 @@ impl AccountantSkel { .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) .filter(|x| x.0.verify()) .collect(); - let rsps = Self::process_packets(obj, req_vers); + let rsps = obj.lock().unwrap().process_packets(req_vers)?; let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { //don't wake up the other side if there is nothing @@ -286,3 +304,79 @@ mod tests { assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); } } + +#[cfg(all(feature = "unstable", test))] +mod bench { + extern crate test; + use self::test::Bencher; + use accountant_skel::*; + use accountant::{Accountant, MAX_ENTRY_IDS}; + use signature::{KeyPair, KeyPairUtil}; + use mint::Mint; + use transaction::Transaction; + use std::collections::HashSet; + use std::io::sink; + use std::time::Instant; + use bincode::serialize; + use hash::hash; + + #[bench] + fn process_packets_bench(_bencher: &mut Bencher) { + let mint = Mint::new(100_000_000); + let acc = Accountant::new(&mint); + let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); + // Create transactions between unrelated parties. + let txs = 100_000; + let last_ids: Mutex> = Mutex::new(HashSet::new()); + let transactions: Vec<_> = (0..txs) + .into_par_iter() + .map(|i| { + // Seed the 'to' account and a cell for its signature. + let dummy_id = i % (MAX_ENTRY_IDS as i32); + let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash + { + let mut last_ids = last_ids.lock().unwrap(); + if !last_ids.contains(&last_id) { + last_ids.insert(last_id); + acc.register_entry_id(&last_id); + } + } + + // Seed the 'from' account. + let rando0 = KeyPair::new(); + let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); + acc.process_verified_transaction(&tr).unwrap(); + + let rando1 = KeyPair::new(); + let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); + acc.process_verified_transaction(&tr).unwrap(); + + // Finally, return a transaction that's unique + Transaction::new(&rando0, rando1.pubkey(), 1, last_id) + }) + .collect(); + + let req_vers = transactions + .into_iter() + .map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8)) + .collect(); + + let historian = Historian::new(&mint.last_id(), None); + let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian); + + let now = Instant::now(); + assert!(skel.process_packets(req_vers).is_ok()); + let duration = now.elapsed(); + let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; + let tps = txs as f64 / sec; + + // Ensure that all transactions were successfully logged. + skel.historian.sender.send(Signal::Tick).unwrap(); + drop(skel.historian.sender); + let entries: Vec = skel.historian.receiver.iter().collect(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].events.len(), txs as usize); + + println!("{} tps", tps); + } +}