Do request verification in parallel, and then process the verified requests

This commit is contained in:
Greg Fitzgerald 2018-03-29 13:18:08 -06:00
parent c59c38e50e
commit 22f5985f1b
2 changed files with 17 additions and 5 deletions

View File

@ -116,7 +116,7 @@ impl Accountant {
} }
/// Process a Transaction that has already been verified. /// Process a Transaction that has already been verified.
fn process_verified_transaction( pub fn process_verified_transaction(
self: &mut Self, self: &mut Self,
tr: &Transaction, tr: &Transaction,
allow_deposits: bool, allow_deposits: bool,

View File

@ -19,6 +19,7 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use transaction::Transaction; use transaction::Transaction;
use rayon::prelude::*;
pub struct AccountantSkel<W: Write + Send + 'static> { pub struct AccountantSkel<W: Write + Send + 'static> {
pub acc: Accountant, pub acc: Accountant,
@ -34,8 +35,19 @@ pub enum Request {
GetId { is_last: bool }, GetId { is_last: bool },
} }
impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
match *self {
Request::Transaction(ref tr) => tr.verify(),
_ => true,
}
}
}
/// Parallel verfication of a batch of requests.
fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> { fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> {
reqs reqs.into_par_iter().filter({ |x| x.0.verify() }).collect()
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -66,10 +78,10 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
} }
/// Process Request items sent by clients. /// Process Request items sent by clients.
pub fn process_request(self: &mut Self, msg: Request) -> Option<Response> { pub fn process_verified_request(self: &mut Self, msg: Request) -> Option<Response> {
match msg { match msg {
Request::Transaction(tr) => { Request::Transaction(tr) => {
if let Err(err) = self.acc.process_transaction(tr) { if let Err(err) = self.acc.process_verified_transaction(&tr, false) {
eprintln!("Transaction error: {:?}", err); eprintln!("Transaction error: {:?}", err);
} }
None None
@ -114,7 +126,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let mut num = 0; let mut num = 0;
let mut ursps = rsps.write().unwrap(); let mut ursps = rsps.write().unwrap();
for (req, rsp_addr) in reqs { for (req, rsp_addr) in reqs {
if let Some(resp) = obj.lock().unwrap().process_request(req) { if let Some(resp) = obj.lock().unwrap().process_verified_request(req) {
if ursps.responses.len() <= num { if ursps.responses.len() <= num {
ursps ursps
.responses .responses