From 8454eb79d0c0a818118e051c4e9a2c7278059649 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Tue, 22 May 2018 09:46:52 -0700 Subject: [PATCH] Send events to the right address and set recv socket timeout --- src/bin/client-demo.rs | 32 ++++++++++++----- src/bin/testnode.rs | 1 + src/streamer.rs | 4 +++ src/thin_client.rs | 82 +++++++++++++++++++++++++----------------- src/tvu.rs | 1 - 5 files changed, 77 insertions(+), 43 deletions(-) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index da47e0e632..35754f1a24 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -34,7 +34,7 @@ fn print_usage(program: &str, opts: Options) { fn main() { let mut threads = 4usize; - let mut addr: String = "127.0.0.1:8000".to_string(); + let mut server_addr: String = "127.0.0.1:8000".to_string(); let mut requests_addr: String = "127.0.0.1:8010".to_string(); let mut opts = Options::new(); @@ -57,7 +57,7 @@ fn main() { return; } if matches.opt_present("s") { - addr = matches.opt_str("s").unwrap(); + server_addr = matches.opt_str("s").unwrap(); } if matches.opt_present("c") { requests_addr = matches.opt_str("c").unwrap(); @@ -94,7 +94,16 @@ fn main() { .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let events_socket = UdpSocket::bind(&events_addr).unwrap(); - let mut client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket); + let requests_addr: SocketAddr = server_addr.parse().unwrap(); + let requests_port = requests_addr.port(); + let mut events_server_addr = requests_addr.clone(); + events_server_addr.set_port(requests_port + 3); + let mut client = ThinClient::new( + requests_addr, + requests_socket, + events_server_addr, + events_socket, + ); println!("Get last ID..."); let last_id = client.get_last_id().wait().unwrap(); @@ -138,17 +147,22 @@ fn main() { let chunks: Vec<_> = transactions.chunks(sz).collect(); chunks.into_par_iter().for_each(|trs| { println!("Transferring 1 unit {} times... to", trs.len()); - let mut requests_addr: SocketAddr = requests_addr.parse().unwrap(); - requests_addr.set_port(0); - let requests_socket = UdpSocket::bind(requests_addr).unwrap(); + let requests_addr: SocketAddr = server_addr.parse().unwrap(); + let mut requests_cb_addr = requests_addr.clone(); + requests_cb_addr.set_port(0); + let requests_socket = UdpSocket::bind(requests_cb_addr).unwrap(); requests_socket .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let mut events_addr: SocketAddr = requests_addr.clone(); - let requests_port = events_addr.port(); - events_addr.set_port(requests_port + 1); + events_addr.set_port(0); let events_socket = UdpSocket::bind(&events_addr).unwrap(); - let client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket); + let client = ThinClient::new( + requests_addr, + requests_socket, + events_server_addr, + events_socket, + ); for tr in trs { client.transfer_signed(tr.clone()).unwrap(); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index ca4a6f7a34..864d668fe3 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -55,6 +55,7 @@ fn main() { let gossip_addr = format!("0.0.0.0:{}", port + 1); let replicate_addr = format!("0.0.0.0:{}", port + 2); let events_addr = format!("0.0.0.0:{}", port + 3); + eprintln!("events_addr: {:?}", events_addr); if stdin_isatty() { eprintln!("nothing found on stdin, expected a log file"); diff --git a/src/streamer.rs b/src/streamer.rs index 7186421871..ee83cd25d0 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -53,6 +53,10 @@ pub fn receiver( recycler: PacketRecycler, packet_sender: PacketSender, ) -> JoinHandle<()> { + let res = sock.set_read_timeout(Some(Duration::new(1, 0))); + if res.is_err() { + panic!("streamer::receiver set_read_timeout error"); + } spawn(move || { let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); () diff --git a/src/thin_client.rs b/src/thin_client.rs index bdf43d3199..2c8af045a8 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -15,9 +15,10 @@ use std::net::{SocketAddr, UdpSocket}; use transaction::Transaction; pub struct ThinClient { - pub addr: SocketAddr, - pub requests_socket: UdpSocket, - pub events_socket: UdpSocket, + requests_addr: SocketAddr, + requests_socket: UdpSocket, + events_addr: SocketAddr, + events_socket: UdpSocket, last_id: Option, transaction_count: u64, balances: HashMap>, @@ -27,10 +28,16 @@ impl ThinClient { /// Create a new ThinClient that will interface with Rpu /// over `requests_socket` and `events_socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking ThinClient methods. - pub fn new(addr: SocketAddr, requests_socket: UdpSocket, events_socket: UdpSocket) -> Self { + pub fn new( + requests_addr: SocketAddr, + requests_socket: UdpSocket, + events_addr: SocketAddr, + events_socket: UdpSocket, + ) -> Self { let client = ThinClient { - addr: addr, + requests_addr, requests_socket, + events_addr, events_socket, last_id: None, transaction_count: 0, @@ -70,7 +77,7 @@ impl ThinClient { pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let event = Event::Transaction(tr); let data = serialize(&event).expect("serialize Transaction in pub fn transfer_signed"); - self.events_socket.send_to(&data, &self.addr) + self.events_socket.send_to(&data, &self.events_addr) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. @@ -94,7 +101,7 @@ impl ThinClient { let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance"); self.requests_socket - .send_to(&data, &self.addr) + .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_balance"); let mut done = false; while !done { @@ -116,7 +123,7 @@ impl ThinClient { let data = serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); self.requests_socket - .send_to(&data, &self.addr) + .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn transaction_count"); let mut done = false; while !done { @@ -137,12 +144,11 @@ impl ThinClient { let req = Request::GetLastId; let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); self.requests_socket - .send_to(&data, &self.addr) + .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_last_id"); let mut done = false; while !done { let resp = self.recv_response().expect("get_last_id response"); - info!("recv_response {:?}", resp); if let &Response::LastId { .. } = &resp { done = true; } @@ -152,6 +158,22 @@ impl ThinClient { } } +#[cfg(test)] +pub fn poll_get_balance(client: &mut ThinClient, pubkey: &PublicKey) -> io::Result { + use std::time::Instant; + + let mut balance; + let now = Instant::now(); + loop { + balance = client.get_balance(pubkey); + if balance.is_ok() || now.elapsed().as_secs() > 1 { + break; + } + } + + balance +} + #[cfg(test)] mod tests { use super::*; @@ -169,12 +191,10 @@ mod tests { use std::thread::JoinHandle; use std::thread::sleep; use std::time::Duration; - use std::time::Instant; use streamer::default_window; use tvu::{self, Tvu}; #[test] - #[ignore] fn test_thin_client() { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -183,6 +203,7 @@ mod tests { .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events_addr = events_socket.local_addr().unwrap(); let addr = requests_socket.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( @@ -220,22 +241,12 @@ mod tests { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(addr, requests_socket, events_socket); + let mut client = ThinClient::new(addr, requests_socket, events_addr, events_socket); let last_id = client.get_last_id().wait().unwrap(); let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); - let mut balance; - let now = Instant::now(); - loop { - balance = client.get_balance(&bob_pubkey); - if balance.is_ok() { - break; - } - if now.elapsed().as_secs() > 0 { - break; - } - } + let balance = poll_get_balance(&mut client, &bob_pubkey); assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); for t in server.thread_hdls { @@ -244,8 +255,8 @@ mod tests { } #[test] - #[ignore] fn test_bad_sig() { + logger::setup(); let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); @@ -258,6 +269,7 @@ mod tests { let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + let events_addr = events_socket.local_addr().unwrap(); let server = Server::new( bank, @@ -279,11 +291,9 @@ mod tests { .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - let mut client = ThinClient::new(serve_addr, requests_socket, events_socket); + let mut client = ThinClient::new(serve_addr, requests_socket, events_addr, events_socket); let last_id = client.get_last_id().wait().unwrap(); - trace!("doing stuff"); - let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); let _sig = client.transfer_signed(tr).unwrap(); @@ -295,10 +305,9 @@ mod tests { tr2.data.plan = Plan::new_payment(502, bob_pubkey); let _sig = client.transfer_signed(tr2).unwrap(); - assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500); - trace!("exiting"); + let balance = poll_get_balance(&mut client, &bob_pubkey); + assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); - trace!("joining threads"); for t in server.thread_hdls { t.join().unwrap(); } @@ -405,6 +414,7 @@ mod tests { local.set_port(0); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + let events_addr = leader.4.local_addr().unwrap(); let server = Server::new( leader_bank, @@ -435,7 +445,12 @@ mod tests { .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(leader.0.serve_addr, requests_socket, events_socket); + let mut client = ThinClient::new( + leader.0.serve_addr, + requests_socket, + events_addr, + events_socket, + ); trace!("getting leader last_id"); let last_id = client.get_last_id().wait().unwrap(); info!("executing leader transer"); @@ -455,7 +470,8 @@ mod tests { .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(*serve_addr, requests_socket, events_socket); + let mut client = + ThinClient::new(*serve_addr, requests_socket, events_addr, events_socket); for i in 0..10 { trace!("getting replicant balance {} {}/10", *serve_addr, i); if let Ok(bal) = client.get_balance(&bob_pubkey) { diff --git a/src/tvu.rs b/src/tvu.rs index f0ec2ccac6..08024c299a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -235,7 +235,6 @@ mod tests { /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] - #[ignore] fn test_replicate() { logger::setup(); let (leader_data, leader_gossip, _, leader_serve, _) = test_node();