Merge pull request #115 from garious/parallelize-accountant

More refactoring
This commit is contained in:
Greg Fitzgerald 2018-04-11 10:28:15 -06:00 committed by GitHub
commit 078179e9b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 38 deletions

View File

@ -53,11 +53,6 @@ impl Request {
} }
} }
/// Parallel verfication of a batch of requests.
pub fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> {
reqs.into_par_iter().filter({ |x| x.0.verify() }).collect()
}
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub enum Response { pub enum Response {
Balance { key: PublicKey, val: Option<i64> }, Balance { key: PublicKey, val: Option<i64> },
@ -112,28 +107,39 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
} }
} }
fn verifier( fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
recvr: &streamer::PacketReceiver,
sendr: &Sender<(Vec<SharedPackets>, Vec<Vec<u8>>)>,
) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?; let msgs = recvr.recv_timeout(timer)?;
trace!("got msgs"); trace!("got msgs");
let mut v = Vec::new(); let mut batch = vec![msgs];
v.push(msgs);
while let Ok(more) = recvr.try_recv() { while let Ok(more) = recvr.try_recv() {
trace!("got more msgs"); trace!("got more msgs");
v.push(more); batch.push(more);
} }
info!("batch {}", v.len()); info!("batch len {}", batch.len());
let chunk = max(1, (v.len() + 3) / 4); Ok(batch)
let chunks: Vec<_> = v.chunks(chunk).collect(); }
let rvs: Vec<_> = chunks
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<Vec<(SharedPackets, Vec<u8>)>> {
let chunk_size = max(1, (batch.len() + 3) / 4);
let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect();
batches
.into_par_iter() .into_par_iter()
.map(|x| ecdsa::ed25519_verify(&x.to_vec())) .map(|batch| {
.collect(); let r = ecdsa::ed25519_verify(&batch);
for (v, r) in v.chunks(chunk).zip(rvs) { batch.into_iter().zip(r).collect()
sendr.send((v.to_vec(), r))?; })
.collect()
}
fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Result<()> {
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
for xs in verified_batches {
sendr.send(xs)?;
} }
Ok(()) Ok(())
} }
@ -151,21 +157,16 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
fn process_packets( fn process_packets(
obj: &Arc<Mutex<AccountantSkel<W>>>, obj: &Arc<Mutex<AccountantSkel<W>>>,
reqs: Vec<Option<(Request, SocketAddr)>>, req_vers: Vec<(Request, SocketAddr, u8)>,
vers: Vec<u8>,
) -> Vec<(Response, SocketAddr)> { ) -> Vec<(Response, SocketAddr)> {
let mut rsps = Vec::new(); req_vers
for (data, v) in reqs.into_iter().zip(vers.into_iter()) { .into_iter()
if let Some((req, rsp_addr)) = data { .filter_map(|(req, rsp_addr, v)| {
if !req.verify() { let mut skel = obj.lock().unwrap();
continue; skel.log_verified_request(req, v)
} .map(|resp| (resp, rsp_addr))
if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { })
rsps.push((resp, rsp_addr)); .collect()
}
}
}
rsps
} }
fn serialize_response( fn serialize_response(
@ -198,16 +199,21 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
fn process( fn process(
obj: &Arc<Mutex<AccountantSkel<W>>>, obj: &Arc<Mutex<AccountantSkel<W>>>,
verified_receiver: &Receiver<(Vec<SharedPackets>, Vec<Vec<u8>>)>, verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
blob_sender: &streamer::BlobSender, blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler, packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let (mms, vvs) = verified_receiver.recv_timeout(timer)?; let mms = verified_receiver.recv_timeout(timer)?;
for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap()); let reqs = Self::deserialize_packets(&msgs.read().unwrap());
let rsps = Self::process_packets(obj, reqs, vers); let req_vers = reqs.into_iter()
.zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| x.0.verify())
.collect();
let rsps = Self::process_packets(obj, req_vers);
let blobs = Self::serialize_responses(rsps, blob_recycler)?; let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() { if !blobs.is_empty() {
//don't wake up the other side if there is nothing //don't wake up the other side if there is nothing