From d239d4a49597b774ab00f24c4ae566eb0bdaf993 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 12 May 2018 17:57:28 -0600 Subject: [PATCH] Add missing files --- .gitignore | 1 + src/request.rs | 99 +++++++++++++++++ src/request_processor.rs | 232 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 332 insertions(+) create mode 100644 src/request.rs create mode 100644 src/request_processor.rs diff --git a/.gitignore b/.gitignore index f8d7348a3..3d4a8febf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ Cargo.lock /target/ **/*.rs.bk +.cargo diff --git a/src/request.rs b/src/request.rs new file mode 100644 index 000000000..7575010ad --- /dev/null +++ b/src/request.rs @@ -0,0 +1,99 @@ +//! The `request` module defines the messages for the thin client. + +use bincode::serialize; +use hash::Hash; +use packet; +use packet::SharedPackets; +use signature::PublicKey; +use transaction::Transaction; + +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Transaction(Transaction), + GetBalance { key: PublicKey }, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + pub id: Hash, + pub num_hashes: u64, + pub num_events: u64, +} + +impl Request { + /// Verify the request is valid. + pub fn verify(&self) -> bool { + match *self { + Request::Transaction(ref tr) => tr.verify_plan(), + _ => true, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Response { + Balance { key: PublicKey, val: Option }, + EntryInfo(EntryInfo), +} + +pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { + let mut out = vec![]; + for rrs in reqs.chunks(packet::NUM_PACKETS) { + let p = r.allocate(); + p.write() + .unwrap() + .packets + .resize(rrs.len(), Default::default()); + for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { + let v = serialize(&i).expect("serialize request"); + let len = v.len(); + o.data[..len].copy_from_slice(&v); + o.meta.size = len; + } + out.push(p); + } + return out; +} + +#[cfg(test)] +mod tests { + use bincode::serialize; + use ecdsa; + use packet::{PacketRecycler, NUM_PACKETS}; + use request::{to_request_packets, Request}; + use transaction::{memfind, test_tx}; + + #[test] + fn test_layout() { + let tr = test_tx(); + let tx = serialize(&tr).unwrap(); + let packet = serialize(&Request::Transaction(tr)).unwrap(); + assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } + + #[test] + fn test_to_packets() { + let tr = Request::Transaction(test_tx()); + let re = PacketRecycler::default(); + let rv = to_request_packets(&re, vec![tr.clone(); 1]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), 1); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); + assert_eq!(rv.len(), 2); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + assert_eq!(rv[1].read().unwrap().packets.len(), 1); + } +} diff --git a/src/request_processor.rs b/src/request_processor.rs new file mode 100644 index 000000000..6f2f3bd7c --- /dev/null +++ b/src/request_processor.rs @@ -0,0 +1,232 @@ +//! The `request_stage` processes thin client Request messages. + +use accountant::Accountant; +use bincode::{deserialize, serialize}; +use entry::Entry; +use event::Event; +use event_processor::EventProcessor; +use packet; +use packet::SharedPackets; +use rayon::prelude::*; +use request::{EntryInfo, Request, Response, Subscription}; +use result::Result; +use std::collections::VecDeque; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::time::Instant; +use streamer; +use timing; + +pub struct RequestProcessor { + accountant: Arc, + entry_info_subscribers: Mutex>, +} + +impl RequestProcessor { + /// Create a new Tpu that wraps the given Accountant. + pub fn new(accountant: Arc) -> Self { + RequestProcessor { + accountant, + entry_info_subscribers: Mutex::new(vec![]), + } + } + + /// Process Request items sent by clients. + fn process_request( + &self, + msg: Request, + rsp_addr: SocketAddr, + ) -> Option<(Response, SocketAddr)> { + match msg { + Request::GetBalance { key } => { + let val = self.accountant.get_balance(&key); + let rsp = (Response::Balance { key, val }, rsp_addr); + info!("Response::Balance {:?}", rsp); + Some(rsp) + } + Request::Transaction(_) => unreachable!(), + Request::Subscribe { subscriptions } => { + for subscription in subscriptions { + match subscription { + Subscription::EntryInfo => { + self.entry_info_subscribers.lock().unwrap().push(rsp_addr) + } + } + } + None + } + } + } + + pub fn process_requests( + &self, + reqs: Vec<(Request, SocketAddr)>, + ) -> Vec<(Response, SocketAddr)> { + reqs.into_iter() + .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) + .collect() + } + + pub fn notify_entry_info_subscribers(&self, entry: &Entry) { + // TODO: No need to bind(). + let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); + + // copy subscribers to avoid taking lock while doing io + let addrs = self.entry_info_subscribers.lock().unwrap().clone(); + trace!("Sending to {} addrs", addrs.len()); + for addr in addrs { + let entry_info = EntryInfo { + id: entry.id, + num_hashes: entry.num_hashes, + num_events: entry.events.len() as u64, + }; + let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); + trace!("sending {} to {}", data.len(), addr); + //TODO dont do IO here, this needs to be on a separate channel + let res = socket.send_to(&data, addr); + if res.is_err() { + eprintln!("couldn't send response: {:?}", res); + } + } + } + + fn deserialize_requests(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + // Copy-paste of deserialize_requests() because I can't figure out how to + // route the lifetimes in a generic version. + pub fn deserialize_events(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + /// Split Request list into verified transactions and the rest + fn partition_requests( + req_vers: Vec<(Request, SocketAddr, u8)>, + ) -> (Vec, Vec<(Request, SocketAddr)>) { + let mut events = vec![]; + let mut reqs = vec![]; + for (msg, rsp_addr, verify) in req_vers { + match msg { + Request::Transaction(tr) => { + if verify != 0 { + events.push(Event::Transaction(tr)); + } + } + _ => reqs.push((msg, rsp_addr)), + } + } + (events, reqs) + } + + fn serialize_response( + resp: Response, + rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) + } + + pub fn process_request_packets( + &self, + event_processor: &EventProcessor, + verified_receiver: &Receiver)>>, + entry_sender: &Sender, + blob_sender: &streamer::BlobSender, + packet_recycler: &packet::PacketRecycler, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let recv_start = Instant::now(); + let mms = verified_receiver.recv_timeout(timer)?; + let mut reqs_len = 0; + let mms_len = mms.len(); + info!( + "@{:?} process start stalled for: {:?}ms batches: {}", + timing::timestamp(), + timing::duration_as_ms(&recv_start.elapsed()), + mms.len(), + ); + let proc_start = Instant::now(); + for (msgs, vers) in mms { + let reqs = Self::deserialize_requests(&msgs.read().unwrap()); + reqs_len += reqs.len(); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| { + let v = x.0.verify(); + v + }) + .collect(); + + debug!("partitioning"); + let (events, reqs) = Self::partition_requests(req_vers); + debug!("events: {} reqs: {}", events.len(), reqs.len()); + + debug!("process_events"); + let entry = event_processor.process_events(events)?; + entry_sender.send(entry)?; + debug!("done process_events"); + + debug!("process_requests"); + let rsps = self.process_requests(reqs); + debug!("done process_requests"); + + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + info!("process: sending blobs: {}", blobs.len()); + //don't wake up the other side if there is nothing + blob_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + mms_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) + } +}