diff --git a/src/thin_client_service.rs b/src/thin_client_service.rs index df46b15b1..3886f2e34 100644 --- a/src/thin_client_service.rs +++ b/src/thin_client_service.rs @@ -2,14 +2,65 @@ //! on behalf of thing clients. use accountant::Accountant; -use bincode::serialize; +use accounting_stage::AccountingStage; +use bincode::{deserialize, serialize}; use entry::Entry; +use event::Event; use hash::Hash; +use packet; +use packet::SharedPackets; +use rayon::prelude::*; +use result::Result; use signature::PublicKey; +use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; -//use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; use transaction::Transaction; +//use std::io::{Cursor, Write}; +//use std::sync::atomic::{AtomicBool, Ordering}; +//use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::Receiver; +use std::sync::{Arc, Mutex}; +//use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use std::time::Instant; +use streamer; +use timing; + +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Transaction(Transaction), + GetBalance { key: PublicKey }, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + pub id: Hash, + pub num_hashes: u64, + pub num_events: u64, +} + +impl Request { + /// Verify the request is valid. + pub fn verify(&self) -> bool { + match *self { + Request::Transaction(ref tr) => tr.verify_plan(), + _ => true, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Response { + Balance { key: PublicKey, val: Option }, + EntryInfo(EntryInfo), +} pub struct ThinClientService { //pub output: Mutex>, @@ -88,40 +139,196 @@ impl ThinClientService { } } } -} -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Request { - Transaction(Transaction), - GetBalance { key: PublicKey }, - Subscribe { subscriptions: Vec }, -} + fn deserialize_requests(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} + // Copy-paste of deserialize_requests() because I can't figure out how to + // route the lifetimes in a generic version. + pub fn deserialize_events(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, -} - -impl Request { - /// Verify the request is valid. - pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, + /// Split Request list into verified transactions and the rest + fn partition_requests( + req_vers: Vec<(Request, SocketAddr, u8)>, + ) -> (Vec, Vec<(Request, SocketAddr)>) { + let mut events = vec![]; + let mut reqs = vec![]; + for (msg, rsp_addr, verify) in req_vers { + match msg { + Request::Transaction(tr) => { + if verify != 0 { + events.push(Event::Transaction(tr)); + } + } + _ => reqs.push((msg, rsp_addr)), + } } + (events, reqs) + } + + fn serialize_response( + resp: Response, + rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) + } + + pub fn process_request_packets( + &self, + accounting_stage: &AccountingStage, + verified_receiver: &Receiver)>>, + responder_sender: &streamer::BlobSender, + packet_recycler: &packet::PacketRecycler, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let recv_start = Instant::now(); + let mms = verified_receiver.recv_timeout(timer)?; + let mut reqs_len = 0; + let mms_len = mms.len(); + info!( + "@{:?} process start stalled for: {:?}ms batches: {}", + timing::timestamp(), + timing::duration_as_ms(&recv_start.elapsed()), + mms.len(), + ); + let proc_start = Instant::now(); + for (msgs, vers) in mms { + let reqs = Self::deserialize_requests(&msgs.read().unwrap()); + reqs_len += reqs.len(); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| { + let v = x.0.verify(); + v + }) + .collect(); + + debug!("partitioning"); + let (events, reqs) = Self::partition_requests(req_vers); + debug!("events: {} reqs: {}", events.len(), reqs.len()); + + debug!("process_events"); + accounting_stage.process_events(events)?; + debug!("done process_events"); + + debug!("process_requests"); + let rsps = self.process_requests(reqs); + debug!("done process_requests"); + + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + info!("process: sending blobs: {}", blobs.len()); + //don't wake up the other side if there is nothing + responder_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + mms_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) } } -#[derive(Serialize, Deserialize, Debug)] -pub enum Response { - Balance { key: PublicKey, val: Option }, - EntryInfo(EntryInfo), +#[cfg(test)] +pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { + let mut out = vec![]; + for rrs in reqs.chunks(packet::NUM_PACKETS) { + let p = r.allocate(); + p.write() + .unwrap() + .packets + .resize(rrs.len(), Default::default()); + for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { + let v = serialize(&i).expect("serialize request"); + let len = v.len(); + o.data[..len].copy_from_slice(&v); + o.meta.size = len; + } + out.push(p); + } + return out; +} + +#[cfg(test)] +mod tests { + use bincode::serialize; + use ecdsa; + use packet::{PacketRecycler, NUM_PACKETS}; + use transaction::{memfind, test_tx}; + use thin_client_service::{to_request_packets, Request}; + + #[test] + fn test_layout() { + let tr = test_tx(); + let tx = serialize(&tr).unwrap(); + let packet = serialize(&Request::Transaction(tr)).unwrap(); + assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } + + #[test] + fn test_to_packets() { + let tr = Request::Transaction(test_tx()); + let re = PacketRecycler::default(); + let rv = to_request_packets(&re, vec![tr.clone(); 1]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), 1); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); + assert_eq!(rv.len(), 2); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + assert_eq!(rv[1].read().unwrap().packets.len(), 1); + } } diff --git a/src/tpu.rs b/src/tpu.rs index 97dd51063..d42c3bcf1 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -2,30 +2,27 @@ //! 5-stage transaction processing pipeline in software. use accounting_stage::AccountingStage; -use bincode::{deserialize, serialize}; use crdt::{Crdt, ReplicatedData}; use ecdsa; use entry::Entry; -use event::Event; use ledger; use packet; use packet::SharedPackets; use rand::{thread_rng, Rng}; -use rayon::prelude::*; use result::Result; use serde_json; use std::collections::VecDeque; use std::io::Write; use std::io::sink; -use std::net::{SocketAddr, UdpSocket}; +use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer; -use thin_client_service::{Request, Response, ThinClientService}; +use thin_client_service::ThinClientService; use timing; pub struct Tpu { @@ -186,141 +183,6 @@ impl Tpu { Ok(()) } - pub fn deserialize_requests(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - // Copy-paste of deserialize_requests() because I can't figure out how to - // route the lifetimes in a generic version. - pub fn deserialize_events(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - /// Split Request list into verified transactions and the rest - fn partition_requests( - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> (Vec, Vec<(Request, SocketAddr)>) { - let mut events = vec![]; - let mut reqs = vec![]; - for (msg, rsp_addr, verify) in req_vers { - match msg { - Request::Transaction(tr) => { - if verify != 0 { - events.push(Event::Transaction(tr)); - } - } - _ => reqs.push((msg, rsp_addr)), - } - } - (events, reqs) - } - - fn serialize_response( - resp: Response, - rsp_addr: SocketAddr, - blob_recycler: &packet::BlobRecycler, - ) -> Result { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) - } - - fn serialize_responses( - rsps: Vec<(Response, SocketAddr)>, - blob_recycler: &packet::BlobRecycler, - ) -> Result> { - let mut blobs = VecDeque::new(); - for (resp, rsp_addr) in rsps { - blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); - } - Ok(blobs) - } - - fn process_request_packets( - obj: &Tpu, - verified_receiver: &Receiver)>>, - responder_sender: &streamer::BlobSender, - packet_recycler: &packet::PacketRecycler, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; - let mut reqs_len = 0; - let mms_len = mms.len(); - info!( - "@{:?} process start stalled for: {:?}ms batches: {}", - timing::timestamp(), - timing::duration_as_ms(&recv_start.elapsed()), - mms.len(), - ); - let proc_start = Instant::now(); - for (msgs, vers) in mms { - let reqs = Self::deserialize_requests(&msgs.read().unwrap()); - reqs_len += reqs.len(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| { - let v = x.0.verify(); - v - }) - .collect(); - - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - debug!("process_events"); - obj.accounting_stage.process_events(events)?; - debug!("done process_events"); - - debug!("process_requests"); - let rsps = obj.thin_client_service.process_requests(reqs); - debug!("done process_requests"); - - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - info!("process: sending blobs: {}", blobs.len()); - //don't wake up the other side if there is nothing - responder_sender.send(blobs)?; - } - packet_recycler.recycle(msgs); - } - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); - info!( - "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", - timing::timestamp(), - mms_len, - total_time_ms, - reqs_len, - (reqs_len as f32) / (total_time_s) - ); - Ok(()) - } - /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( @@ -413,8 +275,8 @@ impl Tpu { let tpu = obj.clone(); let t_server = spawn(move || loop { - let e = Self::process_request_packets( - &mut tpu.clone(), + let e = tpu.thin_client_service.process_request_packets( + &tpu.accounting_stage, &verified_receiver, &responder_sender, &packet_recycler, @@ -559,8 +421,8 @@ impl Tpu { let tpu = obj.clone(); let s_exit = exit.clone(); let t_server = spawn(move || loop { - let e = Self::process_request_packets( - &mut tpu.clone(), + let e = tpu.thin_client_service.process_request_packets( + &tpu.accounting_stage, &verified_receiver, &responder_sender, &packet_recycler, @@ -592,26 +454,6 @@ impl Tpu { } } -#[cfg(test)] -pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { - let mut out = vec![]; - for rrs in reqs.chunks(packet::NUM_PACKETS) { - let p = r.allocate(); - p.write() - .unwrap() - .packets - .resize(rrs.len(), Default::default()); - for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { - let v = serialize(&i).expect("serialize request"); - let len = v.len(); - o.data[..len].copy_from_slice(&v); - o.meta.size = len; - } - out.push(p); - } - return out; -} - #[cfg(test)] pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; @@ -637,13 +479,12 @@ mod tests { use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; - use ecdsa; use entry; use event::Event; use hash::{hash, Hash}; use logger; use mint::Mint; - use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; + use packet::BlobRecycler; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering}; @@ -651,34 +492,8 @@ mod tests { use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer; - use tpu::{test_node, to_request_packets, Request, Tpu}; - use transaction::{memfind, test_tx, Transaction}; - - #[test] - fn test_layout() { - let tr = test_tx(); - let tx = serialize(&tr).unwrap(); - let packet = serialize(&Request::Transaction(tr)).unwrap(); - assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); - assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); - } - #[test] - fn test_to_packets() { - let tr = Request::Transaction(test_tx()); - let re = PacketRecycler::default(); - let rv = to_request_packets(&re, vec![tr.clone(); 1]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), 1); - - let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - - let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); - assert_eq!(rv.len(), 2); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - assert_eq!(rv[1].read().unwrap().packets.len(), 1); - } + use tpu::{test_node, Tpu}; + use transaction::Transaction; /// Test that mesasge sent from leader to target1 and repliated to target2 #[test]