diff --git a/src/entry_writer.rs b/src/entry_writer.rs index f50a7d313b..ab973ba3bc 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -4,7 +4,6 @@ use accountant::Accountant; use entry::Entry; use ledger; use packet; -use request_processor::RequestProcessor; use result::Result; use serde_json; use std::collections::VecDeque; @@ -17,16 +16,12 @@ use streamer; pub struct EntryWriter<'a> { accountant: &'a Accountant, - request_processor: &'a RequestProcessor, } impl<'a> EntryWriter<'a> { /// Create a new Tpu that wraps the given Accountant. - pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self { - EntryWriter { - accountant, - request_processor, - } + pub fn new(accountant: &'a Accountant) -> Self { + EntryWriter { accountant } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { @@ -37,7 +32,6 @@ impl<'a> EntryWriter<'a> { "{}", serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") ).expect("writeln! in fn write_entry"); - self.request_processor.notify_entry_info_subscribers(&entry); } fn write_entries( diff --git a/src/request.rs b/src/request.rs index 6b07ff38a2..d1c692ae42 100644 --- a/src/request.rs +++ b/src/request.rs @@ -14,19 +14,6 @@ pub enum Request { GetBalance { key: PublicKey }, GetLastId, GetTransactionCount, - Subscribe { subscriptions: Vec }, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, } impl Request { @@ -44,7 +31,6 @@ pub enum Response { Balance { key: PublicKey, val: Option }, LastId { id: Hash }, TransactionCount { transaction_count: u64 }, - EntryInfo(EntryInfo), } pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { diff --git a/src/request_processor.rs b/src/request_processor.rs index f2b6c51e25..4e6c815edf 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -8,12 +8,12 @@ use event_processor::EventProcessor; use packet; use packet::SharedPackets; use rayon::prelude::*; -use request::{EntryInfo, Request, Response, Subscription}; +use request::{Request, Response}; use result::Result; use std::collections::VecDeque; -use std::net::{SocketAddr, UdpSocket}; +use std::net::SocketAddr; +use std::sync::Arc; use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, Mutex}; use std::time::Duration; use std::time::Instant; use streamer; @@ -21,16 +21,12 @@ use timing; pub struct RequestProcessor { accountant: Arc, - entry_info_subscribers: Mutex>, } impl RequestProcessor { /// Create a new Tpu that wraps the given Accountant. pub fn new(accountant: Arc) -> Self { - RequestProcessor { - accountant, - entry_info_subscribers: Mutex::new(vec![]), - } + RequestProcessor { accountant } } /// Process Request items sent by clients. @@ -59,16 +55,6 @@ impl RequestProcessor { Some(rsp) } Request::Transaction(_) => unreachable!(), - Request::Subscribe { subscriptions } => { - for subscription in subscriptions { - match subscription { - Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) - } - } - } - None - } } } @@ -81,29 +67,6 @@ impl RequestProcessor { .collect() } - pub fn notify_entry_info_subscribers(&self, entry: &Entry) { - // TODO: No need to bind(). - let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); - - // copy subscribers to avoid taking lock while doing io - let addrs = self.entry_info_subscribers.lock().unwrap().clone(); - trace!("Sending to {} addrs", addrs.len()); - for addr in addrs { - let entry_info = EntryInfo { - id: entry.id, - num_hashes: entry.num_hashes, - num_events: entry.events.len() as u64, - }; - let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); - trace!("sending {} to {}", data.len(), addr); - //TODO dont do IO here, this needs to be on a separate channel - let res = socket.send_to(&data, addr); - if res.is_err() { - eprintln!("couldn't send response: {:?}", res); - } - } - } - fn deserialize_requests(p: &packet::Packets) -> Vec> { p.packets .par_iter() diff --git a/src/rpu.rs b/src/rpu.rs index 2cea96fab3..b8954c5542 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -33,7 +33,6 @@ impl Rpu { fn write_service( accountant: Arc, - request_processor: Arc, exit: Arc, broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, @@ -41,7 +40,7 @@ impl Rpu { entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&accountant, &request_processor); + let entry_writer = EntryWriter::new(&accountant); let _ = entry_writer.write_and_send_entries( &broadcast, &blob_recycler, @@ -99,7 +98,6 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( self.event_processor.accountant.clone(), - request_stage.request_processor.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), diff --git a/src/thin_client.rs b/src/thin_client.rs index 9a6be16024..2bc2908432 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -6,7 +6,7 @@ use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; -use request::{Request, Response, Subscription}; +use request::{Request, Response}; use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; @@ -18,7 +18,6 @@ pub struct ThinClient { pub requests_socket: UdpSocket, pub events_socket: UdpSocket, last_id: Option, - num_events: u64, transaction_count: u64, balances: HashMap>, } @@ -33,22 +32,12 @@ impl ThinClient { requests_socket, events_socket, last_id: None, - num_events: 0, transaction_count: 0, balances: HashMap::new(), }; - client.init(); client } - pub fn init(&self) { - let subscriptions = vec![Subscription::EntryInfo]; - let req = Request::Subscribe { subscriptions }; - let data = serialize(&req).expect("serialize Subscribe in thin_client"); - trace!("subscribing to {}", self.addr); - let _res = self.requests_socket.send_to(&data, &self.addr); - } - pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; info!("start recv_from"); @@ -72,11 +61,6 @@ impl ThinClient { info!("Response transaction count {:?}", transaction_count); self.transaction_count = transaction_count; } - Response::EntryInfo(entry_info) => { - trace!("Response entry_info {:?}", entry_info.id); - self.last_id = Some(entry_info.id); - self.num_events += entry_info.num_events; - } } } diff --git a/src/tvu.rs b/src/tvu.rs index d486a5888a..8deedbf23c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -34,12 +34,11 @@ impl Tvu { fn drain_service( accountant: Arc, - request_processor: Arc, exit: Arc, entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&accountant, &request_processor); + let entry_writer = EntryWriter::new(&accountant); loop { let _ = entry_writer.drain_entries(&entry_receiver); if exit.load(Ordering::Relaxed) { @@ -183,7 +182,6 @@ impl Tvu { let t_write = Self::drain_service( obj.event_processor.accountant.clone(), - request_stage.request_processor.clone(), exit.clone(), request_stage.entry_receiver, );