From 7d811afab16307080995812d7e8d507c4105dd4e Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 15:21:49 -0600 Subject: [PATCH 1/8] Parallelize CPU sig verify --- src/ecdsa.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 9594d0167..64fc3658d 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -73,19 +73,17 @@ fn verify_packet(packet: &Packet) -> u8 { #[cfg(not(feature = "cuda"))] pub fn ed25519_verify(batches: &Vec) -> Vec> { - let mut locks = Vec::new(); - let mut rvs = Vec::new(); - for packets in batches { - locks.push(packets.read().unwrap()); - } - - for p in locks { - let mut v = Vec::new(); - v.resize(p.packets.len(), 0); - v = p.packets.par_iter().map(|x| verify_packet(x)).collect(); - rvs.push(v); - } - rvs + batches + .into_par_iter() + .map(|p| { + p.read() + .unwrap() + .packets + .par_iter() + .map(verify_packet) + .collect() + }) + .collect() } #[cfg(feature = "cuda")] From 81e2b36d38f4f72734db3ac11efa624b2837d601 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 15:24:15 -0600 Subject: [PATCH 2/8] Cleanup packet_verify --- src/ecdsa.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 64fc3658d..3b35cca1b 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -53,22 +53,17 @@ fn verify_packet(packet: &Packet) -> u8 { let pub_key_start = TX_OFFSET + PUB_KEY_OFFSET; let pub_key_end = pub_key_start + PUB_KEY_SIZE; - if packet.meta.size > msg_start { - let msg_end = packet.meta.size; - return if signature::verify( - &signature::ED25519, - untrusted::Input::from(&packet.data[pub_key_start..pub_key_end]), - untrusted::Input::from(&packet.data[msg_start..msg_end]), - untrusted::Input::from(&packet.data[sig_start..sig_end]), - ).is_ok() - { - 1 - } else { - 0 - }; - } else { + if packet.meta.size <= msg_start { return 0; } + + let msg_end = packet.meta.size; + signature::verify( + &signature::ED25519, + untrusted::Input::from(&packet.data[pub_key_start..pub_key_end]), + untrusted::Input::from(&packet.data[msg_start..msg_end]), + untrusted::Input::from(&packet.data[sig_start..sig_end]), + ).is_ok() as u8 } #[cfg(not(feature = "cuda"))] From 6fd32fe8509f605054a1699bfa93f1483c2b960d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 15:43:05 -0600 Subject: [PATCH 3/8] Cleanup constants --- src/ecdsa.rs | 35 ++++++++++++----------------------- src/transaction.rs | 11 +++++++---- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 3b35cca1b..33b8fc40c 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -1,27 +1,8 @@ -// Cuda-only imports -#[cfg(feature = "cuda")] -use packet::PACKET_DATA_SIZE; -#[cfg(feature = "cuda")] +use packet::{Packet, SharedPackets}; +use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; use std::mem::size_of; -// Non-cuda imports -#[cfg(not(feature = "cuda"))] -use rayon::prelude::*; -#[cfg(not(feature = "cuda"))] -use ring::signature; -#[cfg(not(feature = "cuda"))] -use untrusted; - -// Shared imports -use packet::{Packet, SharedPackets}; - pub const TX_OFFSET: usize = 4; -pub const SIGNED_DATA_OFFSET: usize = 112; -pub const SIG_OFFSET: usize = 8; -pub const PUB_KEY_OFFSET: usize = 80; - -pub const SIG_SIZE: usize = 64; -pub const PUB_KEY_SIZE: usize = 32; #[cfg(feature = "cuda")] #[repr(C)] @@ -47,11 +28,15 @@ extern "C" { #[cfg(not(feature = "cuda"))] fn verify_packet(packet: &Packet) -> u8 { + use ring::signature; + use untrusted; + use signature::{PublicKey, Signature}; + let msg_start = TX_OFFSET + SIGNED_DATA_OFFSET; let sig_start = TX_OFFSET + SIG_OFFSET; - let sig_end = sig_start + SIG_SIZE; + let sig_end = sig_start + size_of::(); let pub_key_start = TX_OFFSET + PUB_KEY_OFFSET; - let pub_key_end = pub_key_start + PUB_KEY_SIZE; + let pub_key_end = pub_key_start + size_of::(); if packet.meta.size <= msg_start { return 0; @@ -68,6 +53,8 @@ fn verify_packet(packet: &Packet) -> u8 { #[cfg(not(feature = "cuda"))] pub fn ed25519_verify(batches: &Vec) -> Vec> { + use rayon::prelude::*; + batches .into_par_iter() .map(|p| { @@ -83,6 +70,8 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { #[cfg(feature = "cuda")] pub fn ed25519_verify(batches: &Vec) -> Vec> { + use packet::PACKET_DATA_SIZE; + let mut out = Vec::new(); let mut elems = Vec::new(); let mut locks = Vec::new(); diff --git a/src/transaction.rs b/src/transaction.rs index bf7127975..dda9581e2 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -7,6 +7,10 @@ use plan::{Condition, Payment, Plan}; use rayon::prelude::*; use signature::{KeyPair, KeyPairUtil, PublicKey, Signature, SignatureUtil}; +pub const SIGNED_DATA_OFFSET: usize = 112; +pub const SIG_OFFSET: usize = 8; +pub const PUB_KEY_OFFSET: usize = 80; + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct TransactionData { pub tokens: i64, @@ -125,7 +129,6 @@ pub fn verify_transactions(transactions: &[Transaction]) -> bool { mod tests { use super::*; use bincode::{deserialize, serialize}; - use ecdsa; #[test] fn test_claim() { @@ -196,9 +199,9 @@ mod tests { let tr = test_tx(); let sign_data = tr.get_sign_data(); let tx = serialize(&tr).unwrap(); - assert_matches!(memfind(&tx, &sign_data), Some(ecdsa::SIGNED_DATA_OFFSET)); - assert_matches!(memfind(&tx, &tr.sig), Some(ecdsa::SIG_OFFSET)); - assert_matches!(memfind(&tx, &tr.from), Some(ecdsa::PUB_KEY_OFFSET)); + assert_matches!(memfind(&tx, &sign_data), Some(SIGNED_DATA_OFFSET)); + assert_matches!(memfind(&tx, &tr.sig), Some(SIG_OFFSET)); + assert_matches!(memfind(&tx, &tr.from), Some(PUB_KEY_OFFSET)); } #[test] From 686908911185ab277f32229a03210c9e87c3ca3b Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 15:52:58 -0600 Subject: [PATCH 4/8] Parallelize deserialize --- src/accountant_skel.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e45efb83e..cc556ae65 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -139,18 +139,14 @@ impl AccountantSkel { } pub fn deserialize_packets(p: &packet::Packets) -> Vec> { - // TODO: deserealize in parallel - let mut r = vec![]; - for x in &p.packets { - let rsp_addr = x.meta.addr(); - let sz = x.meta.size; - if let Ok(req) = deserialize(&x.data[0..sz]) { - r.push(Some((req, rsp_addr))); - } else { - r.push(None); - } - } - r + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() } fn process( From 912a5f951eac7c44ded1e7eb0e1e86b9bc1755d6 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 15:58:11 -0600 Subject: [PATCH 5/8] Why is msgs cloned here? --- src/accountant_skel.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index cc556ae65..4ed3640bf 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -159,7 +159,6 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let (mms, vvs) = verified_receiver.recv_timeout(timer)?; for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { - let msgs_ = msgs.clone(); let mut rsps = VecDeque::new(); { let reqs = Self::deserialize_packets(&((*msgs).read().unwrap())); @@ -187,7 +186,7 @@ impl AccountantSkel { //don't wake up the other side if there is nothing blob_sender.send(rsps)?; } - packet_recycler.recycle(msgs_); + packet_recycler.recycle(msgs); } Ok(()) } From 7bd3a8e004bc88b4365aec6e07ac203673541603 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 16:12:13 -0600 Subject: [PATCH 6/8] Reduce cyclomatic complexity --- src/accountant_skel.rs | 54 ++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 4ed3640bf..2d8b699a3 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -149,6 +149,35 @@ impl AccountantSkel { .collect() } + fn process_packets( + obj: &Arc>>, + reqs: Vec>, + vers: Vec, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut rsps = VecDeque::new(); + for (data, v) in reqs.into_iter().zip(vers.into_iter()) { + if let Some((req, rsp_addr)) = data { + if !req.verify() { + continue; + } + if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + rsps.push_back(blob); + } + } + } + Ok(rsps) + } + fn process( obj: &Arc>>, verified_receiver: &Receiver<(Vec, Vec>)>, @@ -159,29 +188,8 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let (mms, vvs) = verified_receiver.recv_timeout(timer)?; for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { - let mut rsps = VecDeque::new(); - { - let reqs = Self::deserialize_packets(&((*msgs).read().unwrap())); - for (data, v) in reqs.into_iter().zip(vers.into_iter()) { - if let Some((req, rsp_addr)) = data { - if !req.verify() { - continue; - } - if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - rsps.push_back(blob); - } - } - } - } + let reqs = Self::deserialize_packets(&msgs.read().unwrap()); + let rsps = Self::process_packets(obj, reqs, vers, blob_recycler)?; if !rsps.is_empty() { //don't wake up the other side if there is nothing blob_sender.send(rsps)?; From a93ec03d2c816ed8e190afc9da11ed84400021bd Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 16:21:20 -0600 Subject: [PATCH 7/8] Move creating blobs into its own function --- src/accountant_skel.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 2d8b699a3..9e77b4894 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -149,6 +149,23 @@ impl AccountantSkel { .collect() } + fn create_blob_from_response( + blob_recycler: &packet::BlobRecycler, + resp: Response, + rsp_addr: SocketAddr, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + fn process_packets( obj: &Arc>>, reqs: Vec>, @@ -162,15 +179,7 @@ impl AccountantSkel { continue; } if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } + let blob = Self::create_blob_from_response(blob_recycler, resp, rsp_addr)?; rsps.push_back(blob); } } @@ -189,10 +198,10 @@ impl AccountantSkel { let (mms, vvs) = verified_receiver.recv_timeout(timer)?; for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { let reqs = Self::deserialize_packets(&msgs.read().unwrap()); - let rsps = Self::process_packets(obj, reqs, vers, blob_recycler)?; - if !rsps.is_empty() { + let blobs = Self::process_packets(obj, reqs, vers, blob_recycler)?; + if !blobs.is_empty() { //don't wake up the other side if there is nothing - blob_sender.send(rsps)?; + blob_sender.send(blobs)?; } packet_recycler.recycle(msgs); } From 584c8c07b811fd1d9202ea8d80d29c06ed0b3089 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 6 Apr 2018 16:34:52 -0600 Subject: [PATCH 8/8] Better symmetry deserialize -> process -> serialize --- src/accountant_skel.rs | 48 +++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 9e77b4894..df2c19f02 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -149,10 +149,29 @@ impl AccountantSkel { .collect() } - fn create_blob_from_response( - blob_recycler: &packet::BlobRecycler, + fn process_packets( + obj: &Arc>>, + reqs: Vec>, + vers: Vec, + ) -> Vec<(Response, SocketAddr)> { + let mut rsps = Vec::new(); + for (data, v) in reqs.into_iter().zip(vers.into_iter()) { + if let Some((req, rsp_addr)) = data { + if !req.verify() { + continue; + } + if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { + rsps.push((resp, rsp_addr)); + } + } + } + rsps + } + + fn serialize_response( resp: Response, rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, ) -> Result { let blob = blob_recycler.allocate(); { @@ -166,25 +185,15 @@ impl AccountantSkel { Ok(blob) } - fn process_packets( - obj: &Arc>>, - reqs: Vec>, - vers: Vec, + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, blob_recycler: &packet::BlobRecycler, ) -> Result> { - let mut rsps = VecDeque::new(); - for (data, v) in reqs.into_iter().zip(vers.into_iter()) { - if let Some((req, rsp_addr)) = data { - if !req.verify() { - continue; - } - if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { - let blob = Self::create_blob_from_response(blob_recycler, resp, rsp_addr)?; - rsps.push_back(blob); - } - } + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); } - Ok(rsps) + Ok(blobs) } fn process( @@ -198,7 +207,8 @@ impl AccountantSkel { let (mms, vvs) = verified_receiver.recv_timeout(timer)?; for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { let reqs = Self::deserialize_packets(&msgs.read().unwrap()); - let blobs = Self::process_packets(obj, reqs, vers, blob_recycler)?; + let rsps = Self::process_packets(obj, reqs, vers); + let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { //don't wake up the other side if there is nothing blob_sender.send(blobs)?;