From 600a1f88662e57968b5ac3ae87ebaf1a17c7d40d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 11 May 2018 16:35:53 -0600 Subject: [PATCH] Initialize thin client with events port --- src/bin/client-demo.rs | 34 ++++++++++++++++++-------- src/thin_client.rs | 55 ++++++++++++++++++++++++++---------------- 2 files changed, 58 insertions(+), 31 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 5b308a0f6..da47e0e63 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -35,7 +35,7 @@ fn print_usage(program: &str, opts: Options) { fn main() { let mut threads = 4usize; let mut addr: String = "127.0.0.1:8000".to_string(); - let mut client_addr: String = "127.0.0.1:8010".to_string(); + let mut requests_addr: String = "127.0.0.1:8010".to_string(); let mut opts = Options::new(); opts.optopt("s", "", "server address", "host:port"); @@ -60,12 +60,16 @@ fn main() { addr = matches.opt_str("s").unwrap(); } if matches.opt_present("c") { - client_addr = matches.opt_str("c").unwrap(); + requests_addr = matches.opt_str("c").unwrap(); } if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } + let mut events_addr: SocketAddr = requests_addr.parse().unwrap(); + let requests_port = events_addr.port(); + events_addr.set_port(requests_port + 1); + if stdin_isatty() { eprintln!("nothing found on stdin, expected a json file"); exit(1); @@ -84,10 +88,13 @@ fn main() { exit(1); }); - println!("Binding to {}", client_addr); - let socket = UdpSocket::bind(&client_addr).unwrap(); - socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut client = ThinClient::new(addr.parse().unwrap(), socket); + println!("Binding to {}", requests_addr); + let requests_socket = UdpSocket::bind(&requests_addr).unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(5, 0))) + .unwrap(); + let events_socket = UdpSocket::bind(&events_addr).unwrap(); + let mut client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket); println!("Get last ID..."); let last_id = client.get_last_id().wait().unwrap(); @@ -131,10 +138,17 @@ fn main() { let chunks: Vec<_> = transactions.chunks(sz).collect(); chunks.into_par_iter().for_each(|trs| { println!("Transferring 1 unit {} times... to", trs.len()); - let mut client_addr: SocketAddr = client_addr.parse().unwrap(); - client_addr.set_port(0); - let socket = UdpSocket::bind(client_addr).unwrap(); - let client = ThinClient::new(addr.parse().unwrap(), socket); + let mut requests_addr: SocketAddr = requests_addr.parse().unwrap(); + requests_addr.set_port(0); + let requests_socket = UdpSocket::bind(requests_addr).unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(5, 0))) + .unwrap(); + let mut events_addr: SocketAddr = requests_addr.clone(); + let requests_port = events_addr.port(); + events_addr.set_port(requests_port + 1); + let events_socket = UdpSocket::bind(&events_addr).unwrap(); + let client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket); for tr in trs { client.transfer_signed(tr.clone()).unwrap(); } diff --git a/src/thin_client.rs b/src/thin_client.rs index d7ee25ab4..852338f7c 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -15,7 +15,8 @@ use transaction::Transaction; pub struct ThinClient { pub addr: SocketAddr, - pub socket: UdpSocket, + pub requests_socket: UdpSocket, + pub events_socket: UdpSocket, last_id: Option, num_events: u64, balances: HashMap>, @@ -23,12 +24,13 @@ pub struct ThinClient { impl ThinClient { /// Create a new ThinClient that will interface with Tpu - /// over `socket`. To receive responses, the caller must bind `socket` + /// over `requests_socket` and `events_socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking ThinClient methods. - pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self { + pub fn new(addr: SocketAddr, requests_socket: UdpSocket, events_socket: UdpSocket) -> Self { let client = ThinClient { addr: addr, - socket, + requests_socket, + events_socket, last_id: None, num_events: 0, balances: HashMap::new(), @@ -42,13 +44,13 @@ impl ThinClient { let req = Request::Subscribe { subscriptions }; let data = serialize(&req).expect("serialize Subscribe in thin_client"); trace!("subscribing to {}", self.addr); - let _res = self.socket.send_to(&data, &self.addr); + let _res = self.requests_socket.send_to(&data, &self.addr); } pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; info!("start recv_from"); - self.socket.recv_from(&mut buf)?; + self.requests_socket.recv_from(&mut buf)?; info!("end recv_from"); let resp = deserialize(&buf).expect("deserialize balance in thin_client"); Ok(resp) @@ -73,7 +75,7 @@ impl ThinClient { pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let req = Request::Transaction(tr); let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); - self.socket.send_to(&data, &self.addr) + self.requests_socket.send_to(&data, &self.addr) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. @@ -96,7 +98,7 @@ impl ThinClient { info!("get_balance"); let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance"); - self.socket + self.requests_socket .send_to(&data, &self.addr) .expect("buffer error in pub fn get_balance"); let mut done = false; @@ -133,7 +135,7 @@ impl ThinClient { } // Then take the rest. - self.socket + self.requests_socket .set_nonblocking(true) .expect("set_nonblocking in pub fn transaction_count"); loop { @@ -142,7 +144,7 @@ impl ThinClient { Ok(resp) => self.process_response(resp), } } - self.socket + self.requests_socket .set_nonblocking(false) .expect("set_nonblocking in pub fn transaction_count"); self.num_events @@ -193,9 +195,10 @@ mod tests { Tpu::serve(&tpu, d, serve, events_socket, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(addr, socket); + let mut client = ThinClient::new(addr, requests_socket, events_socket); let last_id = client.get_last_id().wait().unwrap(); let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) @@ -239,9 +242,12 @@ mod tests { ).unwrap(); sleep(Duration::from_millis(300)); - let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut client = ThinClient::new(serve_addr, socket); + let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(5, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let mut client = ThinClient::new(serve_addr, requests_socket, events_socket); let last_id = client.get_last_id().wait().unwrap(); trace!("doing stuff"); @@ -362,10 +368,13 @@ mod tests { //verify leader can do transfer let leader_balance = { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(leader.0.serve_addr, socket); + let mut client = ThinClient::new(leader.0.serve_addr, requests_socket, events_socket); info!("getting leader last_id"); let last_id = client.get_last_id().wait().unwrap(); info!("executing leader transer"); @@ -379,10 +388,14 @@ mod tests { //verify replicant has the same balance let mut replicant_balance = 0; for _ in 0..10 { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(replicant.0.serve_addr, socket); + let mut client = + ThinClient::new(replicant.0.serve_addr, requests_socket, events_socket); info!("getting replicant balance"); if let Ok(bal) = client.get_balance(&bob_pubkey) { replicant_balance = bal;