diff --git a/src/accountant.rs b/src/accountant.rs index 01c59f308..a76d54a7b 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -152,12 +152,17 @@ impl Accountant { } /// Process a batch of verified transactions. - pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec> { + pub fn process_verified_transactions(&self, trs: Vec) -> Vec> { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. - trs.par_iter() - .map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr)) - .map(|result| result.map(|tr| self.process_verified_transaction_credits(tr))) + trs.into_par_iter() + .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) + .map(|result| { + result.map(|tr| { + self.process_verified_transaction_credits(&tr); + tr + }) + }) .collect() } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 47632cd2e..0ab7d2cd2 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -21,7 +21,7 @@ use std::collections::VecDeque; use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, SendError, Sender}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -82,28 +82,18 @@ impl AccountantSkel { } /// Process Request items sent by clients. - pub fn log_verified_request(&mut self, msg: Request, verify: u8) -> Option { + pub fn process_request( + &mut self, + msg: Request, + rsp_addr: SocketAddr, + ) -> Option<(Response, SocketAddr)> { match msg { - Request::Transaction(_) if verify == 0 => { - trace!("Transaction failed sigverify"); - None - } - Request::Transaction(tr) => { - if let Err(err) = self.acc.process_verified_transaction(&tr) { - trace!("Transaction error: {:?}", err); - } else if let Err(SendError(_)) = self.historian - .sender - .send(Signal::Event(Event::Transaction(tr.clone()))) - { - error!("Channel send error"); - } - None - } Request::GetBalance { key } => { let val = self.acc.get_balance(&key); - Some(Response::Balance { key, val }) + Some((Response::Balance { key, val }, rsp_addr)) } - Request::GetLastId => Some(Response::LastId { id: self.sync() }), + Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)), + Request::Transaction(_) => unreachable!(), } } @@ -155,18 +145,46 @@ impl AccountantSkel { .collect() } - fn process_packets( - obj: &Arc>>, + /// Split Request list into verified transactions and the rest + fn partition_requests( req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> Vec<(Response, SocketAddr)> { - req_vers - .into_iter() - .filter_map(|(req, rsp_addr, v)| { - let mut skel = obj.lock().unwrap(); - skel.log_verified_request(req, v) - .map(|resp| (resp, rsp_addr)) - }) - .collect() + ) -> (Vec, Vec<(Request, SocketAddr)>) { + let mut trs = vec![]; + let mut reqs = vec![]; + for (msg, rsp_addr, verify) in req_vers { + match msg { + Request::Transaction(tr) => { + if verify != 0 { + trs.push(tr); + } + } + _ => reqs.push((msg, rsp_addr)), + } + } + (trs, reqs) + } + + fn process_packets( + &mut self, + req_vers: Vec<(Request, SocketAddr, u8)>, + ) -> Result> { + let (trs, reqs) = Self::partition_requests(req_vers); + + // Process the transactions in parallel and then log the successful ones. + for result in self.acc.process_verified_transactions(trs) { + if let Ok(tr) = result { + self.historian + .sender + .send(Signal::Event(Event::Transaction(tr)))?; + } + } + + // Process the remaining requests serially. + let rsps = reqs.into_iter() + .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) + .collect(); + + Ok(rsps) } fn serialize_response( @@ -213,7 +231,7 @@ impl AccountantSkel { .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) .filter(|x| x.0.verify()) .collect(); - let rsps = Self::process_packets(obj, req_vers); + let rsps = obj.lock().unwrap().process_packets(req_vers)?; let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { //don't wake up the other side if there is nothing