From 13206e4976331f22ec8f4fc1d91d785a97e9beee Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 27 Mar 2018 14:45:04 -0600 Subject: [PATCH] Let clients subscribe to the ledger over TCP TODO: Add more tests Fixes #27 --- src/accountant_skel.rs | 52 ++++++++++++++++++++++++++------------- src/accountant_stub.rs | 55 ++++++++++++++++++++++++------------------ src/bin/client-demo.rs | 5 ++-- 3 files changed, 70 insertions(+), 42 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d4c175edb..7b4deb0c1 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -1,5 +1,5 @@ use accountant::Accountant; -use bincode::{deserialize, serialize}; +use bincode::{deserialize, serialize, serialize_into}; use entry::Entry; use hash::Hash; use result::Result; @@ -7,7 +7,7 @@ use serde_json; use signature::PublicKey; use std::default::Default; use std::io::Write; -use std::net::UdpSocket; +use std::net::{TcpListener, TcpStream, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; @@ -21,6 +21,7 @@ pub struct AccountantSkel { pub last_id: Hash, pub ledger: Vec, writer: W, + subscribers: Vec, } #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] @@ -47,6 +48,7 @@ impl AccountantSkel { last_id, ledger: vec![], writer: w, + subscribers: vec![], } } @@ -54,6 +56,12 @@ impl AccountantSkel { while let Ok(entry) = self.acc.historian.receiver.try_recv() { self.last_id = entry.id; write!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + + for mut subscriber in &self.subscribers { + // TODO: Handle errors. If TCP stream is closed, remove it. + serialize_into(subscriber, &entry).unwrap(); + } + self.ledger.push(entry); } self.last_id @@ -92,8 +100,9 @@ impl AccountantSkel { }), } } + fn process( - &mut self, + obj: &Arc>>, r_reader: &streamer::Receiver, s_responder: &streamer::Responder, packet_recycler: &streamer::PacketRecycler, @@ -110,7 +119,7 @@ impl AccountantSkel { for packet in &msgs.read().unwrap().packets { let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; - if let Some(resp) = self.process_request(req) { + if let Some(resp) = obj.lock().unwrap().process_request(req) { if ursps.responses.len() <= num { ursps .responses @@ -153,21 +162,30 @@ impl AccountantSkel { let t_responder = streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); - let t_server = spawn(move || { - if let Ok(me) = Arc::try_unwrap(obj) { - loop { - let e = me.lock().unwrap().process( - &r_reader, - &s_responder, - &packet_recycler, - &response_recycler, - ); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; - } + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = AccountantSkel::process( + &skel, + &r_reader, + &s_responder, + &packet_recycler, + &response_recycler, + ); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + + let listener = TcpListener::bind(addr)?; + let t_listener = spawn(move || { + for stream in listener.incoming() { + match stream { + Ok(stream) => obj.lock().unwrap().subscribers.push(stream), + Err(_) => break, } } }); - Ok(vec![t_receiver, t_responder, t_server]) + + Ok(vec![t_receiver, t_responder, t_server, t_listener]) } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 4336cdd0f..345b76165 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -3,24 +3,26 @@ //! transfer funds to other users. use accountant_skel::{Request, Response}; -use bincode::{deserialize, serialize}; +use bincode::{deserialize, serialize, serialized_size}; use entry::Entry; use hash::Hash; use signature::{KeyPair, PublicKey, Signature}; -use std::io; -use std::net::UdpSocket; +use std::io::{self, Read}; +use std::net::{TcpStream, UdpSocket}; use transaction::Transaction; pub struct AccountantStub { pub addr: String, pub socket: UdpSocket, + pub stream: TcpStream, } impl AccountantStub { - pub fn new(addr: &str, socket: UdpSocket) -> Self { + pub fn new(addr: &str, socket: UdpSocket, stream: TcpStream) -> Self { AccountantStub { addr: addr.to_string(), socket, + stream, } } @@ -79,25 +81,29 @@ impl AccountantStub { last_id: &Hash, ) -> io::Result<(bool, Hash)> { let mut last_id = *last_id; - let req = Request::GetEntries { last_id }; - let data = serialize(&req).unwrap(); - self.socket.send_to(&data, &self.addr).map(|_| ())?; - let mut buf = vec![0u8; 65_535]; - self.socket.recv_from(&mut buf)?; - let resp = deserialize(&buf).expect("deserialize signature"); + let mut buf_offset = 0; let mut found = false; - if let Response::Entries { entries } = resp { - for Entry { id, events, .. } in entries { - last_id = id; - if !found { - for event in events { - if let Some(sig) = event.get_signature() { - if sig == *wait_sig { - found = true; + if let Ok(bytes) = self.stream.read(&mut buf) { + loop { + match deserialize::(&buf[buf_offset..]) { + Ok(entry) => { + buf_offset += serialized_size(&entry).unwrap() as usize; + last_id = entry.id; + if !found { + for event in entry.events { + if let Some(sig) = event.get_signature() { + if sig == *wait_sig { + found = true; + } + } } } } + Err(_) => { + println!("read {} of {} in buf", buf_offset, bytes); + break; + } } } } @@ -112,6 +118,9 @@ impl AccountantStub { let ret = self.check_on_signature(wait_sig, &last_id)?; found = ret.0; last_id = ret.1; + + // Clunky way to force a sync in the skel. + self.get_last_id()?; } Ok(last_id) } @@ -139,19 +148,19 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, sink()))); - let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); + let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); sleep(Duration::from_millis(300)); let socket = UdpSocket::bind(send_addr).unwrap(); - let mut acc = AccountantStub::new(addr, socket); + let stream = TcpStream::connect(addr).expect("tcp connect"); + stream.set_nonblocking(true).expect("nonblocking"); + + let mut acc = AccountantStub::new(addr, socket, stream); let last_id = acc.get_last_id().unwrap(); let sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500); exit.store(true, Ordering::Relaxed); - for t in threads { - t.join().expect("join"); - } } } diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 42916f84d..9b6515df7 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -6,7 +6,7 @@ use silk::mint::Mint; use silk::signature::{KeyPair, KeyPairUtil}; use silk::transaction::Transaction; use std::io::stdin; -use std::net::UdpSocket; +use std::net::{TcpStream, UdpSocket}; use std::time::Instant; fn main() { @@ -18,7 +18,8 @@ fn main() { let mint_pubkey = mint.pubkey(); let socket = UdpSocket::bind(send_addr).unwrap(); - let mut acc = AccountantStub::new(addr, socket); + let stream = TcpStream::connect(send_addr).unwrap(); + let mut acc = AccountantStub::new(addr, socket, stream); let last_id = acc.get_last_id().unwrap(); let txs = acc.get_balance(&mint_pubkey).unwrap().unwrap();