Enable parallelized accountant

This commit is contained in:
Greg Fitzgerald 2018-04-11 11:17:00 -06:00
parent ab2093926a
commit 6e43e7a146
2 changed files with 58 additions and 35 deletions

View File

@ -152,12 +152,17 @@ impl Accountant {
} }
/// Process a batch of verified transactions. /// Process a batch of verified transactions.
pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec<Result<()>> { pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
// Run all debits first to filter out any transactions that can't be processed // Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically. // in parallel deterministically.
trs.par_iter() trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr)) .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.map(|result| result.map(|tr| self.process_verified_transaction_credits(tr))) .map(|result| {
result.map(|tr| {
self.process_verified_transaction_credits(&tr);
tr
})
})
.collect() .collect()
} }

View File

@ -21,7 +21,7 @@ use std::collections::VecDeque;
use std::io::Write; use std::io::Write;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, SendError, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -82,28 +82,18 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
} }
/// Process Request items sent by clients. /// Process Request items sent by clients.
pub fn log_verified_request(&mut self, msg: Request, verify: u8) -> Option<Response> { pub fn process_request(
&mut self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg { match msg {
Request::Transaction(_) if verify == 0 => {
trace!("Transaction failed sigverify");
None
}
Request::Transaction(tr) => {
if let Err(err) = self.acc.process_verified_transaction(&tr) {
trace!("Transaction error: {:?}", err);
} else if let Err(SendError(_)) = self.historian
.sender
.send(Signal::Event(Event::Transaction(tr.clone())))
{
error!("Channel send error");
}
None
}
Request::GetBalance { key } => { Request::GetBalance { key } => {
let val = self.acc.get_balance(&key); let val = self.acc.get_balance(&key);
Some(Response::Balance { key, val }) Some((Response::Balance { key, val }, rsp_addr))
} }
Request::GetLastId => Some(Response::LastId { id: self.sync() }), Request::GetLastId => Some((Response::LastId { id: self.sync() }, rsp_addr)),
Request::Transaction(_) => unreachable!(),
} }
} }
@ -155,18 +145,46 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
.collect() .collect()
} }
fn process_packets( /// Split Request list into verified transactions and the rest
obj: &Arc<Mutex<AccountantSkel<W>>>, fn partition_requests(
req_vers: Vec<(Request, SocketAddr, u8)>, req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Vec<(Response, SocketAddr)> { ) -> (Vec<Transaction>, Vec<(Request, SocketAddr)>) {
req_vers let mut trs = vec![];
.into_iter() let mut reqs = vec![];
.filter_map(|(req, rsp_addr, v)| { for (msg, rsp_addr, verify) in req_vers {
let mut skel = obj.lock().unwrap(); match msg {
skel.log_verified_request(req, v) Request::Transaction(tr) => {
.map(|resp| (resp, rsp_addr)) if verify != 0 {
}) trs.push(tr);
.collect() }
}
_ => reqs.push((msg, rsp_addr)),
}
}
(trs, reqs)
}
fn process_packets(
&mut self,
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Result<Vec<(Response, SocketAddr)>> {
let (trs, reqs) = Self::partition_requests(req_vers);
// Process the transactions in parallel and then log the successful ones.
for result in self.acc.process_verified_transactions(trs) {
if let Ok(tr) = result {
self.historian
.sender
.send(Signal::Event(Event::Transaction(tr)))?;
}
}
// Process the remaining requests serially.
let rsps = reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect();
Ok(rsps)
} }
fn serialize_response( fn serialize_response(
@ -213,7 +231,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| x.0.verify()) .filter(|x| x.0.verify())
.collect(); .collect();
let rsps = Self::process_packets(obj, req_vers); let rsps = obj.lock().unwrap().process_packets(req_vers)?;
let blobs = Self::serialize_responses(rsps, blob_recycler)?; let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() { if !blobs.is_empty() {
//don't wake up the other side if there is nothing //don't wake up the other side if there is nothing