diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 224fb5c01..9c7c41049 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -26,19 +26,21 @@ impl AccountantStub { /// over `socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking AccountantStub methods. pub fn new(addr: &str, socket: UdpSocket) -> Self { - AccountantStub { + let stub = AccountantStub { addr: addr.to_string(), socket, last_id: None, num_events: 0, balances: HashMap::new(), - } + }; + stub.init(); + stub } pub fn init(&self) { let subscriptions = vec![Subscription::EntryInfo]; let req = Request::Subscribe { subscriptions }; - let data = serialize(&req).expect("serialize GetBalance"); + let data = serialize(&req).expect("serialize Subscribe"); let _res = self.socket.send_to(&data, &self.addr); } @@ -94,8 +96,14 @@ impl AccountantStub { self.socket .send_to(&data, &self.addr) .expect("buffer error"); - let resp = self.recv_response().expect("recv response"); - self.process_response(resp); + let mut done = false; + while !done { + let resp = self.recv_response().expect("recv response"); + if let &Response::Balance { ref key, .. } = &resp { + done = key == pubkey; + } + self.process_response(resp); + } ok(self.balances[pubkey].unwrap()) } @@ -116,7 +124,7 @@ impl AccountantStub { /// Return the number of transactions the server processed since creating /// this stub instance. - pub fn get_transaction_count(&self) -> u64 { + pub fn transaction_count(&self) -> u64 { self.num_events } } diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 559c87cdb..831d2521b 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -18,9 +18,9 @@ use std::env; use std::io::{stdin, Read}; use std::net::UdpSocket; use std::process::exit; -use std::time::Instant; +use std::thread::sleep; +use std::time::{Duration, Instant}; use untrusted::Input; -//use std::sync::mpsc::sync_channel; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); @@ -110,6 +110,8 @@ fn main() { nsps / 1_000_f64 ); + let initial_tx_count = acc.transaction_count(); + println!("Transfering {} transactions in {} batches", txs, threads); let now = Instant::now(); let sz = transactions.len() / threads; @@ -118,17 +120,23 @@ fn main() { println!("Transferring 1 unit {} times...", trs.len()); let send_addr = "0.0.0.0:0"; let socket = UdpSocket::bind(send_addr).unwrap(); - //let (entry_info_sender, receiver) = sync_channel(1000); - //let acc = AccountantStub::new_thin_client(&addr, socket, entry_info_sender); let acc = AccountantStub::new(&addr, socket); for tr in trs { acc.transfer_signed(tr.clone()).unwrap(); } - - println!("Waiting for the server to go idle...",); - //while receiver.recv().unwrap().num_events > 0 {} }); + let mut tx_count = acc.transaction_count(); + println!("tx count {}", tx_count); + let mut prev_tx_count = tx_count + 1; + + println!("Waiting for the server to go idle...",); + while tx_count != prev_tx_count { + sleep(Duration::from_millis(20)); + prev_tx_count = tx_count; + tx_count = acc.transaction_count(); + } + let txs = tx_count - initial_tx_count; println!("Sent transactions {}", txs); let duration = now.elapsed();