diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e45efb83e..df2c19f02 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -139,18 +139,61 @@ impl AccountantSkel { } pub fn deserialize_packets(p: &packet::Packets) -> Vec> { - // TODO: deserealize in parallel - let mut r = vec![]; - for x in &p.packets { - let rsp_addr = x.meta.addr(); - let sz = x.meta.size; - if let Ok(req) = deserialize(&x.data[0..sz]) { - r.push(Some((req, rsp_addr))); - } else { - r.push(None); + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + fn process_packets( + obj: &Arc>>, + reqs: Vec>, + vers: Vec, + ) -> Vec<(Response, SocketAddr)> { + let mut rsps = Vec::new(); + for (data, v) in reqs.into_iter().zip(vers.into_iter()) { + if let Some((req, rsp_addr)) = data { + if !req.verify() { + continue; + } + if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { + rsps.push((resp, rsp_addr)); + } } } - r + rsps + } + + fn serialize_response( + resp: Response, + rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) } fn process( @@ -163,35 +206,14 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let (mms, vvs) = verified_receiver.recv_timeout(timer)?; for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { - let msgs_ = msgs.clone(); - let mut rsps = VecDeque::new(); - { - let reqs = Self::deserialize_packets(&((*msgs).read().unwrap())); - for (data, v) in reqs.into_iter().zip(vers.into_iter()) { - if let Some((req, rsp_addr)) = data { - if !req.verify() { - continue; - } - if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - rsps.push_back(blob); - } - } - } - } - if !rsps.is_empty() { + let reqs = Self::deserialize_packets(&msgs.read().unwrap()); + let rsps = Self::process_packets(obj, reqs, vers); + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { //don't wake up the other side if there is nothing - blob_sender.send(rsps)?; + blob_sender.send(blobs)?; } - packet_recycler.recycle(msgs_); + packet_recycler.recycle(msgs); } Ok(()) } diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 9594d0167..33b8fc40c 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -1,27 +1,8 @@ -// Cuda-only imports -#[cfg(feature = "cuda")] -use packet::PACKET_DATA_SIZE; -#[cfg(feature = "cuda")] +use packet::{Packet, SharedPackets}; +use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; use std::mem::size_of; -// Non-cuda imports -#[cfg(not(feature = "cuda"))] -use rayon::prelude::*; -#[cfg(not(feature = "cuda"))] -use ring::signature; -#[cfg(not(feature = "cuda"))] -use untrusted; - -// Shared imports -use packet::{Packet, SharedPackets}; - pub const TX_OFFSET: usize = 4; -pub const SIGNED_DATA_OFFSET: usize = 112; -pub const SIG_OFFSET: usize = 8; -pub const PUB_KEY_OFFSET: usize = 80; - -pub const SIG_SIZE: usize = 64; -pub const PUB_KEY_SIZE: usize = 32; #[cfg(feature = "cuda")] #[repr(C)] @@ -47,49 +28,50 @@ extern "C" { #[cfg(not(feature = "cuda"))] fn verify_packet(packet: &Packet) -> u8 { + use ring::signature; + use untrusted; + use signature::{PublicKey, Signature}; + let msg_start = TX_OFFSET + SIGNED_DATA_OFFSET; let sig_start = TX_OFFSET + SIG_OFFSET; - let sig_end = sig_start + SIG_SIZE; + let sig_end = sig_start + size_of::(); let pub_key_start = TX_OFFSET + PUB_KEY_OFFSET; - let pub_key_end = pub_key_start + PUB_KEY_SIZE; + let pub_key_end = pub_key_start + size_of::(); - if packet.meta.size > msg_start { - let msg_end = packet.meta.size; - return if signature::verify( - &signature::ED25519, - untrusted::Input::from(&packet.data[pub_key_start..pub_key_end]), - untrusted::Input::from(&packet.data[msg_start..msg_end]), - untrusted::Input::from(&packet.data[sig_start..sig_end]), - ).is_ok() - { - 1 - } else { - 0 - }; - } else { + if packet.meta.size <= msg_start { return 0; } + + let msg_end = packet.meta.size; + signature::verify( + &signature::ED25519, + untrusted::Input::from(&packet.data[pub_key_start..pub_key_end]), + untrusted::Input::from(&packet.data[msg_start..msg_end]), + untrusted::Input::from(&packet.data[sig_start..sig_end]), + ).is_ok() as u8 } #[cfg(not(feature = "cuda"))] pub fn ed25519_verify(batches: &Vec) -> Vec> { - let mut locks = Vec::new(); - let mut rvs = Vec::new(); - for packets in batches { - locks.push(packets.read().unwrap()); - } + use rayon::prelude::*; - for p in locks { - let mut v = Vec::new(); - v.resize(p.packets.len(), 0); - v = p.packets.par_iter().map(|x| verify_packet(x)).collect(); - rvs.push(v); - } - rvs + batches + .into_par_iter() + .map(|p| { + p.read() + .unwrap() + .packets + .par_iter() + .map(verify_packet) + .collect() + }) + .collect() } #[cfg(feature = "cuda")] pub fn ed25519_verify(batches: &Vec) -> Vec> { + use packet::PACKET_DATA_SIZE; + let mut out = Vec::new(); let mut elems = Vec::new(); let mut locks = Vec::new(); diff --git a/src/transaction.rs b/src/transaction.rs index bf7127975..dda9581e2 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -7,6 +7,10 @@ use plan::{Condition, Payment, Plan}; use rayon::prelude::*; use signature::{KeyPair, KeyPairUtil, PublicKey, Signature, SignatureUtil}; +pub const SIGNED_DATA_OFFSET: usize = 112; +pub const SIG_OFFSET: usize = 8; +pub const PUB_KEY_OFFSET: usize = 80; + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct TransactionData { pub tokens: i64, @@ -125,7 +129,6 @@ pub fn verify_transactions(transactions: &[Transaction]) -> bool { mod tests { use super::*; use bincode::{deserialize, serialize}; - use ecdsa; #[test] fn test_claim() { @@ -196,9 +199,9 @@ mod tests { let tr = test_tx(); let sign_data = tr.get_sign_data(); let tx = serialize(&tr).unwrap(); - assert_matches!(memfind(&tx, &sign_data), Some(ecdsa::SIGNED_DATA_OFFSET)); - assert_matches!(memfind(&tx, &tr.sig), Some(ecdsa::SIG_OFFSET)); - assert_matches!(memfind(&tx, &tr.from), Some(ecdsa::PUB_KEY_OFFSET)); + assert_matches!(memfind(&tx, &sign_data), Some(SIGNED_DATA_OFFSET)); + assert_matches!(memfind(&tx, &tr.sig), Some(SIG_OFFSET)); + assert_matches!(memfind(&tx, &tr.from), Some(PUB_KEY_OFFSET)); } #[test]