From d1948b5a00f4fde40f19bb4e6b37398189be3d60 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 10 Apr 2018 21:12:59 -0600 Subject: [PATCH 1/4] Zip earlier And remove redundant into_iter() calls. --- src/accountant_skel.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index df2c19f02..95e6b3717 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -114,7 +114,7 @@ impl AccountantSkel { fn verifier( recvr: &streamer::PacketReceiver, - sendr: &Sender<(Vec, Vec>)>, + sendr: &Sender)>>, ) -> Result<()> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; @@ -133,7 +133,8 @@ impl AccountantSkel { .map(|x| ecdsa::ed25519_verify(&x.to_vec())) .collect(); for (v, r) in v.chunks(chunk).zip(rvs) { - sendr.send((v.to_vec(), r))?; + let xs = v.to_vec().into_iter().zip(r).collect(); + sendr.send(xs)?; } Ok(()) } @@ -155,7 +156,7 @@ impl AccountantSkel { vers: Vec, ) -> Vec<(Response, SocketAddr)> { let mut rsps = Vec::new(); - for (data, v) in reqs.into_iter().zip(vers.into_iter()) { + for (data, v) in reqs.into_iter().zip(vers) { if let Some((req, rsp_addr)) = data { if !req.verify() { continue; @@ -198,14 +199,14 @@ impl AccountantSkel { fn process( obj: &Arc>>, - verified_receiver: &Receiver<(Vec, Vec>)>, + verified_receiver: &Receiver)>>, blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); - let (mms, vvs) = verified_receiver.recv_timeout(timer)?; - for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { + let mms = verified_receiver.recv_timeout(timer)?; + for (msgs, vers) in mms { let reqs = Self::deserialize_packets(&msgs.read().unwrap()); let rsps = Self::process_packets(obj, reqs, vers); let blobs = Self::serialize_responses(rsps, blob_recycler)?; From e5f7eeedbfaff411925bcb3826c33b1aebb321e2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 10 Apr 2018 21:43:53 -0600 Subject: [PATCH 2/4] Use iterators --- src/accountant_skel.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 95e6b3717..e762c2941 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -152,21 +152,16 @@ impl AccountantSkel { fn process_packets( obj: &Arc>>, - reqs: Vec>, - vers: Vec, + req_vers: Vec<(Request, SocketAddr, u8)>, ) -> Vec<(Response, SocketAddr)> { - let mut rsps = Vec::new(); - for (data, v) in reqs.into_iter().zip(vers) { - if let Some((req, rsp_addr)) = data { - if !req.verify() { - continue; - } - if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { - rsps.push((resp, rsp_addr)); - } - } - } - rsps + req_vers + .into_iter() + .filter_map(|(req, rsp_addr, v)| { + let mut skel = obj.lock().unwrap(); + skel.log_verified_request(req, v) + .map(|resp| (resp, rsp_addr)) + }) + .collect() } fn serialize_response( @@ -208,7 +203,12 @@ impl AccountantSkel { let mms = verified_receiver.recv_timeout(timer)?; for (msgs, vers) in mms { let reqs = Self::deserialize_packets(&msgs.read().unwrap()); - let rsps = Self::process_packets(obj, reqs, vers); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| x.0.verify()) + .collect(); + let rsps = Self::process_packets(obj, req_vers); let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { //don't wake up the other side if there is nothing From e838a8c28abb098e8784da92c09734f3f0a86e35 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 10 Apr 2018 21:56:13 -0600 Subject: [PATCH 3/4] Delete unused function --- src/accountant_skel.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e762c2941..3ade01738 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -53,11 +53,6 @@ impl Request { } } -/// Parallel verfication of a batch of requests. -pub fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, SocketAddr)> { - reqs.into_par_iter().filter({ |x| x.0.verify() }).collect() -} - #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: Option }, From 50ccecdff5fbfd550928d1dee52d14429d8aca94 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 11 Apr 2018 09:02:33 -0600 Subject: [PATCH 4/4] Refactor --- src/accountant_skel.rs | 46 +++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 3ade01738..47632cd2e 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -107,28 +107,38 @@ impl AccountantSkel { } } + fn recv_batch(recvr: &streamer::PacketReceiver) -> Result> { + let timer = Duration::new(1, 0); + let msgs = recvr.recv_timeout(timer)?; + trace!("got msgs"); + let mut batch = vec![msgs]; + while let Ok(more) = recvr.try_recv() { + trace!("got more msgs"); + batch.push(more); + } + info!("batch len {}", batch.len()); + Ok(batch) + } + + fn verify_batch(batch: Vec) -> Vec)>> { + let chunk_size = max(1, (batch.len() + 3) / 4); + let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect(); + batches + .into_par_iter() + .map(|batch| { + let r = ecdsa::ed25519_verify(&batch); + batch.into_iter().zip(r).collect() + }) + .collect() + } + fn verifier( recvr: &streamer::PacketReceiver, sendr: &Sender)>>, ) -> Result<()> { - let timer = Duration::new(1, 0); - let msgs = recvr.recv_timeout(timer)?; - trace!("got msgs"); - let mut v = Vec::new(); - v.push(msgs); - while let Ok(more) = recvr.try_recv() { - trace!("got more msgs"); - v.push(more); - } - info!("batch {}", v.len()); - let chunk = max(1, (v.len() + 3) / 4); - let chunks: Vec<_> = v.chunks(chunk).collect(); - let rvs: Vec<_> = chunks - .into_par_iter() - .map(|x| ecdsa::ed25519_verify(&x.to_vec())) - .collect(); - for (v, r) in v.chunks(chunk).zip(rvs) { - let xs = v.to_vec().into_iter().zip(r).collect(); + let batch = Self::recv_batch(recvr)?; + let verified_batches = Self::verify_batch(batch); + for xs in verified_batches { sendr.send(xs)?; } Ok(())