Merge pull request #92 from garious/par-req-processing
Parallel request verification
This commit is contained in:
commit
f9309b46aa
|
@ -116,7 +116,7 @@ impl Accountant {
|
|||
}
|
||||
|
||||
/// Process a Transaction that has already been verified.
|
||||
fn process_verified_transaction(
|
||||
pub fn process_verified_transaction(
|
||||
self: &mut Self,
|
||||
tr: &Transaction,
|
||||
allow_deposits: bool,
|
||||
|
|
|
@ -11,7 +11,7 @@ use serde_json;
|
|||
use signature::PublicKey;
|
||||
use std::default::Default;
|
||||
use std::io::Write;
|
||||
use std::net::UdpSocket;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
@ -19,6 +19,7 @@ use std::thread::{spawn, JoinHandle};
|
|||
use std::time::Duration;
|
||||
use streamer;
|
||||
use transaction::Transaction;
|
||||
use rayon::prelude::*;
|
||||
|
||||
pub struct AccountantSkel<W: Write + Send + 'static> {
|
||||
pub acc: Accountant,
|
||||
|
@ -34,6 +35,21 @@ pub enum Request {
|
|||
GetId { is_last: bool },
|
||||
}
|
||||
|
||||
impl Request {
|
||||
/// Verify the request is valid.
|
||||
pub fn verify(&self) -> bool {
|
||||
match *self {
|
||||
Request::Transaction(ref tr) => tr.verify(),
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parallel verfication of a batch of requests.
|
||||
fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> {
|
||||
reqs.into_par_iter().filter({ |x| x.0.verify() }).collect()
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum Response {
|
||||
Balance { key: PublicKey, val: Option<i64> },
|
||||
|
@ -62,10 +78,10 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||
}
|
||||
|
||||
/// Process Request items sent by clients.
|
||||
pub fn process_request(self: &mut Self, msg: Request) -> Option<Response> {
|
||||
pub fn process_verified_request(self: &mut Self, msg: Request) -> Option<Response> {
|
||||
match msg {
|
||||
Request::Transaction(tr) => {
|
||||
if let Err(err) = self.acc.process_transaction(tr) {
|
||||
if let Err(err) = self.acc.process_verified_transaction(&tr, false) {
|
||||
eprintln!("Transaction error: {:?}", err);
|
||||
}
|
||||
None
|
||||
|
@ -98,12 +114,19 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||
let rsps = streamer::allocate(response_recycler);
|
||||
let rsps_ = rsps.clone();
|
||||
{
|
||||
let mut num = 0;
|
||||
let mut ursps = rsps.write().unwrap();
|
||||
let mut reqs = vec![];
|
||||
for packet in &msgs.read().unwrap().packets {
|
||||
let rsp_addr = packet.meta.get_addr();
|
||||
let sz = packet.meta.size;
|
||||
let req = deserialize(&packet.data[0..sz])?;
|
||||
if let Some(resp) = obj.lock().unwrap().process_request(req) {
|
||||
reqs.push((req, rsp_addr));
|
||||
}
|
||||
let reqs = filter_valid_requests(reqs);
|
||||
|
||||
let mut num = 0;
|
||||
let mut ursps = rsps.write().unwrap();
|
||||
for (req, rsp_addr) in reqs {
|
||||
if let Some(resp) = obj.lock().unwrap().process_verified_request(req) {
|
||||
if ursps.responses.len() <= num {
|
||||
ursps
|
||||
.responses
|
||||
|
@ -114,7 +137,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
|||
let len = v.len();
|
||||
rsp.data[..len].copy_from_slice(&v);
|
||||
rsp.meta.size = len;
|
||||
rsp.meta.set_addr(&packet.meta.get_addr());
|
||||
rsp.meta.set_addr(&rsp_addr);
|
||||
num += 1;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue