From 43cd631579897bbef1b6720a4492d80f02ad0ea5 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 9 May 2018 14:56:34 -0600 Subject: [PATCH] Add thin_client_service --- src/accounting_stage.rs | 103 ------------------------------ src/ecdsa.rs | 2 +- src/lib.rs | 1 + src/thin_client.rs | 2 +- src/thin_client_service.rs | 127 +++++++++++++++++++++++++++++++++++++ src/tpu.rs | 21 ++++-- 6 files changed, 144 insertions(+), 112 deletions(-) create mode 100644 src/thin_client_service.rs diff --git a/src/accounting_stage.rs b/src/accounting_stage.rs index 95fffd6544..df2455e3f5 100644 --- a/src/accounting_stage.rs +++ b/src/accounting_stage.rs @@ -1,18 +1,14 @@ //! The `accounting_stage` module implements the accounting stage of the TPU. use accountant::Accountant; -use bincode::serialize; use entry::Entry; use event::Event; use hash::Hash; use historian::Historian; use recorder::Signal; use result::Result; -use signature::PublicKey; -use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; -use transaction::Transaction; pub struct AccountingStage { pub output: Mutex>, @@ -20,7 +16,6 @@ pub struct AccountingStage { pub acc: Arc, historian_input: Mutex>, historian: Mutex, - entry_info_subscribers: Mutex>, } impl AccountingStage { @@ -33,7 +28,6 @@ impl AccountingStage { output: Mutex::new(output), entry_sender: Mutex::new(entry_sender), acc: Arc::new(acc), - entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), historian: Mutex::new(historian), } @@ -51,105 +45,8 @@ impl AccountingStage { let entry = historian.output.lock().unwrap().recv()?; self.acc.register_entry_id(&entry.id); self.entry_sender.lock().unwrap().send(entry)?; - - debug!("after historian_input"); Ok(()) } - - /// Process Request items sent by clients. - fn process_request( - &self, - msg: Request, - rsp_addr: SocketAddr, - ) -> Option<(Response, SocketAddr)> { - match msg { - Request::GetBalance { key } => { - let val = self.acc.get_balance(&key); - let rsp = (Response::Balance { key, val }, rsp_addr); - info!("Response::Balance {:?}", rsp); - Some(rsp) - } - Request::Transaction(_) => unreachable!(), - Request::Subscribe { subscriptions } => { - for subscription in subscriptions { - match subscription { - Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) - } - } - } - None - } - } - } - - pub fn process_requests( - &self, - reqs: Vec<(Request, SocketAddr)>, - ) -> Vec<(Response, SocketAddr)> { - reqs.into_iter() - .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) - .collect() - } - - pub fn notify_entry_info_subscribers(&self, entry: &Entry) { - // TODO: No need to bind(). - let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); - - // copy subscribers to avoid taking lock while doing io - let addrs = self.entry_info_subscribers.lock().unwrap().clone(); - trace!("Sending to {} addrs", addrs.len()); - for addr in addrs { - let entry_info = EntryInfo { - id: entry.id, - num_hashes: entry.num_hashes, - num_events: entry.events.len() as u64, - }; - let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); - trace!("sending {} to {}", data.len(), addr); - //TODO dont do IO here, this needs to be on a separate channel - let res = socket.send_to(&data, addr); - if res.is_err() { - eprintln!("couldn't send response: {:?}", res); - } - } - } -} - -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Request { - Transaction(Transaction), - GetBalance { key: PublicKey }, - Subscribe { subscriptions: Vec }, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, -} - -impl Request { - /// Verify the request is valid. - pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, - } - } -} - -#[derive(Serialize, Deserialize, Debug)] -pub enum Response { - Balance { key: PublicKey, val: Option }, - EntryInfo(EntryInfo), } #[cfg(test)] diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 9a18c9600e..9ac7959cf1 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -130,11 +130,11 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { #[cfg(test)] mod tests { - use accounting_stage::Request; use bincode::serialize; use ecdsa; use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; + use thin_client_service::Request; use transaction::test_tx; use transaction::Transaction; diff --git a/src/lib.rs b/src/lib.rs index 29b3d36af7..10716a9eb8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ pub mod result; pub mod signature; pub mod streamer; pub mod thin_client; +pub mod thin_client_service; pub mod timing; pub mod tpu; pub mod transaction; diff --git a/src/thin_client.rs b/src/thin_client.rs index 1af59e8abc..74b7665d95 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -3,7 +3,6 @@ //! messages to the network directly. The binary encoding of its messages are //! unstable and may change in future releases. -use accounting_stage::{Request, Response, Subscription}; use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; @@ -11,6 +10,7 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; +use thin_client_service::{Request, Response, Subscription}; use transaction::Transaction; pub struct ThinClient { diff --git a/src/thin_client_service.rs b/src/thin_client_service.rs new file mode 100644 index 0000000000..6c87608e09 --- /dev/null +++ b/src/thin_client_service.rs @@ -0,0 +1,127 @@ +//! The `thin_client_service` sits alongside the TPU and queries it for information +//! on behalf of thing clients. + +use accountant::Accountant; +use bincode::serialize; +use entry::Entry; +use hash::Hash; +use signature::PublicKey; +use std::net::{SocketAddr, UdpSocket}; +//use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use transaction::Transaction; + +pub struct ThinClientService { + //pub output: Mutex>, + //response_sender: Mutex>, + pub acc: Arc, + entry_info_subscribers: Mutex>, +} + +impl ThinClientService { + /// Create a new Tpu that wraps the given Accountant. + pub fn new(acc: Arc) -> Self { + //let (response_sender, output) = channel(); + ThinClientService { + //output: Mutex::new(output), + //response_sender: Mutex::new(response_sender), + acc, + entry_info_subscribers: Mutex::new(vec![]), + } + } + + /// Process Request items sent by clients. + fn process_request( + &self, + msg: Request, + rsp_addr: SocketAddr, + ) -> Option<(Response, SocketAddr)> { + match msg { + Request::GetBalance { key } => { + let val = self.acc.get_balance(&key); + let rsp = (Response::Balance { key, val }, rsp_addr); + info!("Response::Balance {:?}", rsp); + Some(rsp) + } + Request::Transaction(_) => unreachable!(), + Request::Subscribe { subscriptions } => { + for subscription in subscriptions { + match subscription { + Subscription::EntryInfo => { + self.entry_info_subscribers.lock().unwrap().push(rsp_addr) + } + } + } + None + } + } + } + + pub fn process_requests( + &self, + reqs: Vec<(Request, SocketAddr)>, + ) -> Vec<(Response, SocketAddr)> { + reqs.into_iter() + .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) + .collect() + } + + pub fn notify_entry_info_subscribers(&self, entry: &Entry) { + // TODO: No need to bind(). + let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); + + // copy subscribers to avoid taking lock while doing io + let addrs = self.entry_info_subscribers.lock().unwrap().clone(); + trace!("Sending to {} addrs", addrs.len()); + for addr in addrs { + let entry_info = EntryInfo { + id: entry.id, + num_hashes: entry.num_hashes, + num_events: entry.events.len() as u64, + }; + let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); + trace!("sending {} to {}", data.len(), addr); + //TODO dont do IO here, this needs to be on a separate channel + let res = socket.send_to(&data, addr); + if res.is_err() { + eprintln!("couldn't send response: {:?}", res); + } + } + } +} + +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Transaction(Transaction), + GetBalance { key: PublicKey }, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + pub id: Hash, + pub num_hashes: u64, + pub num_events: u64, +} + +impl Request { + /// Verify the request is valid. + pub fn verify(&self) -> bool { + match *self { + Request::Transaction(ref tr) => tr.verify_plan(), + _ => true, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Response { + Balance { key: PublicKey, val: Option }, + EntryInfo(EntryInfo), +} diff --git a/src/tpu.rs b/src/tpu.rs index 0d984093c3..77ca813487 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -2,7 +2,7 @@ //! 5-stage transaction processing pipeline in software. use accountant::Accountant; -use accounting_stage::{AccountingStage, Request, Response}; +use accounting_stage::AccountingStage; use bincode::{deserialize, serialize, serialize_into}; use crdt::{Crdt, ReplicatedData}; use ecdsa; @@ -26,10 +26,12 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer; +use thin_client_service::{Request, Response, ThinClientService}; use timing; pub struct Tpu { accounting: AccountingStage, + thin_client_service: ThinClientService, } type SharedTpu = Arc; @@ -37,7 +39,11 @@ type SharedTpu = Arc; impl Tpu { /// Create a new Tpu that wraps the given Accountant. pub fn new(accounting: AccountingStage) -> Self { - Tpu { accounting } + let thin_client_service = ThinClientService::new(accounting.acc.clone()); + Tpu { + accounting, + thin_client_service, + } } fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { @@ -48,7 +54,8 @@ impl Tpu { "{}", serde_json::to_string(&entry).unwrap() ).unwrap(); - obj.accounting.notify_entry_info_subscribers(&entry); + obj.thin_client_service + .notify_entry_info_subscribers(&entry); } fn receive_all(obj: &SharedTpu, writer: &Arc>) -> Result> { @@ -335,7 +342,7 @@ impl Tpu { debug!("done process_events"); debug!("process_requests"); - let rsps = obj.accounting.process_requests(reqs); + let rsps = obj.thin_client_service.process_requests(reqs); debug!("done process_requests"); let blobs = Self::serialize_responses(rsps, blob_recycler)?; @@ -680,18 +687,18 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke #[cfg(test)] mod tests { - use bincode::serialize; - use ecdsa; - use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS}; use accountant::Accountant; use accounting_stage::AccountingStage; + use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; + use ecdsa; use entry; use event::Event; use hash::{hash, Hash}; use logger; use mint::Mint; + use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS}; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering};