Initialize thin client with events port

This commit is contained in:
Greg Fitzgerald 2018-05-11 16:35:53 -06:00
parent 95bf68f3f5
commit 600a1f8866
2 changed files with 58 additions and 31 deletions

View File

@ -35,7 +35,7 @@ fn print_usage(program: &str, opts: Options) {
fn main() {
let mut threads = 4usize;
let mut addr: String = "127.0.0.1:8000".to_string();
let mut client_addr: String = "127.0.0.1:8010".to_string();
let mut requests_addr: String = "127.0.0.1:8010".to_string();
let mut opts = Options::new();
opts.optopt("s", "", "server address", "host:port");
@ -60,12 +60,16 @@ fn main() {
addr = matches.opt_str("s").unwrap();
}
if matches.opt_present("c") {
client_addr = matches.opt_str("c").unwrap();
requests_addr = matches.opt_str("c").unwrap();
}
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}
let mut events_addr: SocketAddr = requests_addr.parse().unwrap();
let requests_port = events_addr.port();
events_addr.set_port(requests_port + 1);
if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
exit(1);
@ -84,10 +88,13 @@ fn main() {
exit(1);
});
println!("Binding to {}", client_addr);
let socket = UdpSocket::bind(&client_addr).unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut client = ThinClient::new(addr.parse().unwrap(), socket);
println!("Binding to {}", requests_addr);
let requests_socket = UdpSocket::bind(&requests_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let events_socket = UdpSocket::bind(&events_addr).unwrap();
let mut client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket);
println!("Get last ID...");
let last_id = client.get_last_id().wait().unwrap();
@ -131,10 +138,17 @@ fn main() {
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|trs| {
println!("Transferring 1 unit {} times... to", trs.len());
let mut client_addr: SocketAddr = client_addr.parse().unwrap();
client_addr.set_port(0);
let socket = UdpSocket::bind(client_addr).unwrap();
let client = ThinClient::new(addr.parse().unwrap(), socket);
let mut requests_addr: SocketAddr = requests_addr.parse().unwrap();
requests_addr.set_port(0);
let requests_socket = UdpSocket::bind(requests_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let mut events_addr: SocketAddr = requests_addr.clone();
let requests_port = events_addr.port();
events_addr.set_port(requests_port + 1);
let events_socket = UdpSocket::bind(&events_addr).unwrap();
let client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket);
for tr in trs {
client.transfer_signed(tr.clone()).unwrap();
}

View File

@ -15,7 +15,8 @@ use transaction::Transaction;
pub struct ThinClient {
pub addr: SocketAddr,
pub socket: UdpSocket,
pub requests_socket: UdpSocket,
pub events_socket: UdpSocket,
last_id: Option<Hash>,
num_events: u64,
balances: HashMap<PublicKey, Option<i64>>,
@ -23,12 +24,13 @@ pub struct ThinClient {
impl ThinClient {
/// Create a new ThinClient that will interface with Tpu
/// over `socket`. To receive responses, the caller must bind `socket`
/// over `requests_socket` and `events_socket`. To receive responses, the caller must bind `socket`
/// to a public address before invoking ThinClient methods.
pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self {
pub fn new(addr: SocketAddr, requests_socket: UdpSocket, events_socket: UdpSocket) -> Self {
let client = ThinClient {
addr: addr,
socket,
requests_socket,
events_socket,
last_id: None,
num_events: 0,
balances: HashMap::new(),
@ -42,13 +44,13 @@ impl ThinClient {
let req = Request::Subscribe { subscriptions };
let data = serialize(&req).expect("serialize Subscribe in thin_client");
trace!("subscribing to {}", self.addr);
let _res = self.socket.send_to(&data, &self.addr);
let _res = self.requests_socket.send_to(&data, &self.addr);
}
pub fn recv_response(&self) -> io::Result<Response> {
let mut buf = vec![0u8; 1024];
info!("start recv_from");
self.socket.recv_from(&mut buf)?;
self.requests_socket.recv_from(&mut buf)?;
info!("end recv_from");
let resp = deserialize(&buf).expect("deserialize balance in thin_client");
Ok(resp)
@ -73,7 +75,7 @@ impl ThinClient {
pub fn transfer_signed(&self, tr: Transaction) -> io::Result<usize> {
let req = Request::Transaction(tr);
let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed");
self.socket.send_to(&data, &self.addr)
self.requests_socket.send_to(&data, &self.addr)
}
/// Creates, signs, and processes a Transaction. Useful for writing unit-tests.
@ -96,7 +98,7 @@ impl ThinClient {
info!("get_balance");
let req = Request::GetBalance { key: *pubkey };
let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance");
self.socket
self.requests_socket
.send_to(&data, &self.addr)
.expect("buffer error in pub fn get_balance");
let mut done = false;
@ -133,7 +135,7 @@ impl ThinClient {
}
// Then take the rest.
self.socket
self.requests_socket
.set_nonblocking(true)
.expect("set_nonblocking in pub fn transaction_count");
loop {
@ -142,7 +144,7 @@ impl ThinClient {
Ok(resp) => self.process_response(resp),
}
}
self.socket
self.requests_socket
.set_nonblocking(false)
.expect("set_nonblocking in pub fn transaction_count");
self.num_events
@ -193,9 +195,10 @@ mod tests {
Tpu::serve(&tpu, d, serve, events_socket, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(addr, socket);
let mut client = ThinClient::new(addr, requests_socket, events_socket);
let last_id = client.get_last_id().wait().unwrap();
let _sig = client
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
@ -239,9 +242,12 @@ mod tests {
).unwrap();
sleep(Duration::from_millis(300));
let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut client = ThinClient::new(serve_addr, socket);
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let mut client = ThinClient::new(serve_addr, requests_socket, events_socket);
let last_id = client.get_last_id().wait().unwrap();
trace!("doing stuff");
@ -362,10 +368,13 @@ mod tests {
//verify leader can do transfer
let leader_balance = {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(leader.0.serve_addr, socket);
let mut client = ThinClient::new(leader.0.serve_addr, requests_socket, events_socket);
info!("getting leader last_id");
let last_id = client.get_last_id().wait().unwrap();
info!("executing leader transer");
@ -379,10 +388,14 @@ mod tests {
//verify replicant has the same balance
let mut replicant_balance = 0;
for _ in 0..10 {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(replicant.0.serve_addr, socket);
let mut client =
ThinClient::new(replicant.0.serve_addr, requests_socket, events_socket);
info!("getting replicant balance");
if let Ok(bal) = client.get_balance(&bob_pubkey) {
replicant_balance = bal;