Fix up client demo

This commit is contained in:
Greg Fitzgerald 2018-03-28 14:40:58 -06:00
parent 27f29019ef
commit 849bced602
3 changed files with 21 additions and 17 deletions

View File

@ -1,12 +1,12 @@
use accountant::Accountant;
use bincode::{deserialize, serialize, serialize_into};
use bincode::{deserialize, serialize};
use entry::Entry;
use hash::Hash;
use result::Result;
use serde_json;
use signature::PublicKey;
use std::default::Default;
use std::io::Write;
use std::io::{ErrorKind, Write};
use std::net::{TcpListener, TcpStream, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
@ -52,12 +52,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.acc.historian.receiver.try_recv() {
self.last_id = entry.id;
write!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
for mut subscriber in &self.subscribers {
// TODO: Handle errors. If TCP stream is closed, remove it.
serialize_into(subscriber, &entry).unwrap();
}
let buf = serialize(&entry).expect("serialize");
self.subscribers
.retain(|ref mut subscriber| match subscriber.write(&buf) {
Err(err) => err.kind() != ErrorKind::BrokenPipe,
_ => true,
});
}
self.last_id
}

View File

@ -84,7 +84,7 @@ impl AccountantStub {
let mut buf = vec![0u8; 65_535];
let mut buf_offset = 0;
let mut found = false;
if let Ok(bytes) = self.stream.read(&mut buf) {
if let Ok(_bytes) = self.stream.read(&mut buf) {
loop {
match deserialize::<Entry>(&buf[buf_offset..]) {
Ok(entry) => {
@ -100,10 +100,7 @@ impl AccountantStub {
}
}
}
Err(_) => {
println!("read {} of {} in buf", buf_offset, bytes);
break;
}
Err(_) => break,
}
}
}

View File

@ -18,14 +18,17 @@ fn main() {
let mint_pubkey = mint.pubkey();
let socket = UdpSocket::bind(send_addr).unwrap();
let stream = TcpStream::connect(send_addr).unwrap();
let stream = TcpStream::connect(addr).unwrap();
stream.set_nonblocking(true).expect("nonblocking");
let mut acc = AccountantStub::new(addr, socket, stream);
let last_id = acc.get_last_id().unwrap();
let txs = acc.get_balance(&mint_pubkey).unwrap().unwrap();
println!("Mint's Initial Balance {}", txs);
let mint_balance = acc.get_balance(&mint_pubkey).unwrap().unwrap();
println!("Mint's Initial Balance {}", mint_balance);
println!("Signing transactions...");
let txs = mint_balance;
let now = Instant::now();
let transactions: Vec<_> = (0..txs)
.map(|_| {
@ -66,7 +69,9 @@ fn main() {
acc.transfer_signed(tr).unwrap();
}
println!("Waiting for last transaction to be confirmed...",);
acc.wait_on_signature(&sig, &last_id).unwrap();
if txs > 0 {
acc.wait_on_signature(&sig, &last_id).unwrap();
}
let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
@ -74,5 +79,5 @@ fn main() {
println!("Done. {} tps!", tps);
let val = acc.get_balance(&mint_pubkey).unwrap().unwrap();
println!("Mint's Final Balance {}", val);
assert_eq!(val, 0);
assert_eq!(val, mint_balance - txs);
}