diff --git a/src/accountant.rs b/src/accountant.rs index 2892c39737..ab2e3adc8f 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -2,7 +2,7 @@ //! event log to record transactions. Its users can deposit funds and //! transfer funds to other users. -use log::{Event, PublicKey, Sha256Hash, Signature}; +use log::{Entry, Event, PublicKey, Sha256Hash, Signature}; use historian::Historian; use ring::signature::Ed25519KeyPair; use std::sync::mpsc::{RecvError, SendError}; @@ -11,6 +11,7 @@ use std::collections::HashMap; pub struct Accountant { pub historian: Historian, pub balances: HashMap, + pub signatures: HashMap, pub end_hash: Sha256Hash, } @@ -20,13 +21,19 @@ impl Accountant { Accountant { historian: hist, balances: HashMap::new(), + signatures: HashMap::new(), end_hash: *start_hash, } } pub fn process_event(self: &mut Self, event: &Event) { match *event { - Event::Claim { key, data, .. } => { + Event::Claim { key, data, sig } => { + if self.signatures.contains_key(&sig) { + return; + } + self.signatures.insert(sig, true); + if self.balances.contains_key(&key) { if let Some(x) = self.balances.get_mut(&key) { *x += data; @@ -35,7 +42,16 @@ impl Accountant { self.balances.insert(key, data); } } - Event::Transaction { from, to, data, .. } => { + Event::Transaction { + from, + to, + data, + sig, + } => { + if self.signatures.contains_key(&sig) { + return; + } + self.signatures.insert(sig, true); if let Some(x) = self.balances.get_mut(&from) { *x -= data; } @@ -51,7 +67,7 @@ impl Accountant { } } - pub fn sync(self: &mut Self) { + pub fn sync(self: &mut Self) -> Vec> { let mut entries = vec![]; while let Ok(entry) = self.historian.receiver.try_recv() { entries.push(entry); @@ -67,6 +83,8 @@ impl Accountant { for e in &entries { self.process_event(&e.event); } + + entries } pub fn deposit_signed( @@ -83,11 +101,11 @@ impl Accountant { self: &Self, n: u64, keypair: &Ed25519KeyPair, - ) -> Result<(), SendError>> { + ) -> Result>> { use log::{get_pubkey, sign_serialized}; let key = get_pubkey(keypair); let sig = sign_serialized(&n, keypair); - self.deposit_signed(key, n, sig) + self.deposit_signed(key, n, sig).map(|_| sig) } pub fn transfer_signed( @@ -116,25 +134,41 @@ impl Accountant { n: u64, keypair: &Ed25519KeyPair, to: PublicKey, - ) -> Result<(), SendError>> { + ) -> Result>> { use log::{get_pubkey, sign_transaction_data}; let from = get_pubkey(keypair); let sig = sign_transaction_data(&n, keypair, &to); - self.transfer_signed(from, to, n, sig) + self.transfer_signed(from, to, n, sig).map(|_| sig) } pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result { self.sync(); Ok(*self.balances.get(pubkey).unwrap_or(&0)) } + + pub fn wait_on_signature(self: &mut Self, wait_sig: &Signature) { + use std::thread::sleep; + use std::time::Duration; + let mut entries = self.sync(); + let mut found = false; + while !found { + found = entries.iter().any(|e| match e.event { + Event::Claim { sig, .. } => sig == *wait_sig, + Event::Transaction { sig, .. } => sig == *wait_sig, + _ => false, + }); + if !found { + sleep(Duration::from_millis(30)); + entries = self.sync(); + } + } + } } #[cfg(test)] mod tests { use super::*; - use std::thread::sleep; - use std::time::Duration; use log::{generate_keypair, get_pubkey}; use historian::ExitReason; @@ -145,13 +179,13 @@ mod tests { let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); acc.deposit(10_000, &alice_keypair).unwrap(); - acc.deposit(1_000, &bob_keypair).unwrap(); + let sig = acc.deposit(1_000, &bob_keypair).unwrap(); + acc.wait_on_signature(&sig); - sleep(Duration::from_millis(30)); let bob_pubkey = get_pubkey(&bob_keypair); - acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + let sig = acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + acc.wait_on_signature(&sig); - sleep(Duration::from_millis(30)); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); drop(acc.historian.sender); @@ -163,18 +197,20 @@ mod tests { #[test] fn test_invalid_transfer() { + use std::thread::sleep; + use std::time::Duration; let zero = Sha256Hash::default(); let mut acc = Accountant::new(&zero, Some(2)); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); acc.deposit(10_000, &alice_keypair).unwrap(); - acc.deposit(1_000, &bob_keypair).unwrap(); + let sig = acc.deposit(1_000, &bob_keypair).unwrap(); + acc.wait_on_signature(&sig); - sleep(Duration::from_millis(30)); let bob_pubkey = get_pubkey(&bob_keypair); acc.transfer(10_001, &alice_keypair, bob_pubkey).unwrap(); - sleep(Duration::from_millis(30)); + let alice_pubkey = get_pubkey(&alice_keypair); assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); @@ -192,10 +228,10 @@ mod tests { let mut acc = Accountant::new(&zero, Some(2)); let keypair = generate_keypair(); acc.deposit(1, &keypair).unwrap(); - acc.deposit(2, &keypair).unwrap(); + let sig = acc.deposit(2, &keypair).unwrap(); + acc.wait_on_signature(&sig); let pubkey = get_pubkey(&keypair); - sleep(Duration::from_millis(30)); assert_eq!(acc.get_balance(&pubkey).unwrap(), 3); drop(acc.historian.sender); @@ -211,13 +247,12 @@ mod tests { let mut acc = Accountant::new(&zero, Some(2)); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); - acc.deposit(10_000, &alice_keypair).unwrap(); + let sig = acc.deposit(10_000, &alice_keypair).unwrap(); + acc.wait_on_signature(&sig); - sleep(Duration::from_millis(30)); let bob_pubkey = get_pubkey(&bob_keypair); - acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); - - sleep(Duration::from_millis(30)); + let sig = acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + acc.wait_on_signature(&sig); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); drop(acc.historian.sender); diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 18419896cd..0573299a78 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -23,11 +23,15 @@ pub enum Request { GetBalance { key: PublicKey, }, + Wait { + sig: Signature, + }, } #[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: u64 }, + Confirmed { sig: Signature }, } impl AccountantSkel { @@ -49,26 +53,28 @@ impl AccountantSkel { let val = self.obj.get_balance(&key).unwrap(); Some(Response::Balance { key, val }) } + Request::Wait { sig } => { + self.obj.wait_on_signature(&sig); + Some(Response::Confirmed { sig }) + } } } - /// TCP Server that forwards messages to Accountant methods. + /// UDP Server that forwards messages to Accountant methods. pub fn serve(self: &mut Self, addr: &str) -> io::Result<()> { - use std::net::TcpListener; - use std::io::{Read, Write}; + use std::net::UdpSocket; use bincode::{deserialize, serialize}; - let listener = TcpListener::bind(addr)?; + let socket = UdpSocket::bind(addr)?; let mut buf = vec![0u8; 1024]; loop { - //println!("skel: Waiting for incoming connections..."); - let (mut stream, _from_addr) = listener.accept()?; - let _sz = stream.read(&mut buf)?; + //println!("skel: Waiting for incoming packets..."); + let (_sz, src) = socket.recv_from(&mut buf)?; // TODO: Return a descriptive error message if deserialization fails. let req = deserialize(&buf).expect("deserialize request"); if let Some(resp) = self.process_request(req) { - stream.write(&serialize(&resp).expect("serialize response"))?; + socket.send_to(&serialize(&resp).expect("serialize response"), &src)?; } } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 5c5749b1bb..ac1945fa62 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -2,9 +2,8 @@ //! event log to record transactions. Its users can deposit funds and //! transfer funds to other users. -use std::net::TcpStream; +use std::net::UdpSocket; use std::io; -use std::io::{Read, Write}; use bincode::{deserialize, serialize}; use log::{PublicKey, Signature}; use ring::signature::Ed25519KeyPair; @@ -12,12 +11,14 @@ use accountant_skel::{Request, Response}; pub struct AccountantStub { pub addr: String, + pub socket: UdpSocket, } impl AccountantStub { - pub fn new(addr: &str) -> Self { + pub fn new(addr: &str, socket: UdpSocket) -> Self { AccountantStub { addr: addr.to_string(), + socket, } } @@ -29,15 +30,14 @@ impl AccountantStub { ) -> io::Result { let req = Request::Deposit { key, val, sig }; let data = serialize(&req).unwrap(); - let mut stream = TcpStream::connect(&self.addr)?; - stream.write(&data) + self.socket.send_to(&data, &self.addr) } - pub fn deposit(self: &mut Self, n: u64, keypair: &Ed25519KeyPair) -> io::Result { + pub fn deposit(self: &mut Self, n: u64, keypair: &Ed25519KeyPair) -> io::Result { use log::{get_pubkey, sign_serialized}; let key = get_pubkey(keypair); let sig = sign_serialized(&n, keypair); - self.deposit_signed(key, n, sig) + self.deposit_signed(key, n, sig).map(|_| sig) } pub fn transfer_signed( @@ -49,8 +49,7 @@ impl AccountantStub { ) -> io::Result { let req = Request::Transfer { from, to, val, sig }; let data = serialize(&req).unwrap(); - let mut stream = TcpStream::connect(&self.addr)?; - stream.write(&data) + self.socket.send_to(&data, &self.addr) } pub fn transfer( @@ -58,24 +57,39 @@ impl AccountantStub { n: u64, keypair: &Ed25519KeyPair, to: PublicKey, - ) -> io::Result { + ) -> io::Result { use log::{get_pubkey, sign_transaction_data}; let from = get_pubkey(keypair); let sig = sign_transaction_data(&n, keypair, &to); - self.transfer_signed(from, to, n, sig) + self.transfer_signed(from, to, n, sig).map(|_| sig) } pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> io::Result { - let mut stream = TcpStream::connect(&self.addr)?; let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance"); - stream.write(&data)?; + self.socket.send_to(&data, &self.addr)?; let mut buf = vec![0u8; 1024]; - stream.read(&mut buf)?; + self.socket.recv_from(&mut buf)?; let resp = deserialize(&buf).expect("deserialize balance"); - let Response::Balance { key, val } = resp; - assert_eq!(key, *pubkey); - Ok(val) + if let Response::Balance { key, val } = resp { + assert_eq!(key, *pubkey); + return Ok(val); + } + Ok(0) + } + + pub fn wait_on_signature(self: &mut Self, wait_sig: &Signature) -> io::Result<()> { + let req = Request::Wait { sig: *wait_sig }; + let data = serialize(&req).unwrap(); + self.socket.send_to(&data, &self.addr).map(|_| ())?; + + let mut buf = vec![0u8; 1024]; + self.socket.recv_from(&mut buf)?; + let resp = deserialize(&buf).expect("deserialize signature"); + if let Response::Confirmed { sig } = resp { + assert_eq!(sig, *wait_sig); + } + Ok(()) } } @@ -90,7 +104,8 @@ mod tests { #[test] fn test_accountant_stub() { - let addr = "127.0.0.1:8000"; + let addr = "127.0.0.1:9000"; + let send_addr = "127.0.0.1:9001"; spawn(move || { let zero = Sha256Hash::default(); let acc = Accountant::new(&zero, None); @@ -100,17 +115,17 @@ mod tests { sleep(Duration::from_millis(30)); - let mut acc = AccountantStub::new(addr); + let socket = UdpSocket::bind(send_addr).unwrap(); + let mut acc = AccountantStub::new(addr, socket); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); acc.deposit(10_000, &alice_keypair).unwrap(); - acc.deposit(1_000, &bob_keypair).unwrap(); + let sig = acc.deposit(1_000, &bob_keypair).unwrap(); + acc.wait_on_signature(&sig).unwrap(); - sleep(Duration::from_millis(30)); let bob_pubkey = get_pubkey(&bob_keypair); - acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); - - sleep(Duration::from_millis(300)); + let sig = acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + acc.wait_on_signature(&sig).unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); } } diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index e2ea5e56be..cbc0d6dd30 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -2,44 +2,39 @@ extern crate silk; fn main() { use silk::accountant_stub::AccountantStub; - use std::thread::sleep; - use std::time::Duration; + use std::time::Instant; + use std::net::UdpSocket; use silk::log::{generate_keypair, get_pubkey}; let addr = "127.0.0.1:8000"; - let mut acc = AccountantStub::new(addr); + let send_addr = "127.0.0.1:8001"; + let socket = UdpSocket::bind(send_addr).unwrap(); + let mut acc = AccountantStub::new(addr, socket); let alice_keypair = generate_keypair(); - let bob_keypair = generate_keypair(); + let alice_pubkey = get_pubkey(&alice_keypair); let txs = 10_000; println!("Depositing {} units in Alice's account...", txs); - acc.deposit(txs, &alice_keypair).unwrap(); - //acc.deposit(1_000, &bob_keypair).unwrap(); + let sig = acc.deposit(txs, &alice_keypair).unwrap(); + acc.wait_on_signature(&sig).unwrap(); + assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), txs); println!("Done."); - sleep(Duration::from_millis(30)); - let alice_pubkey = get_pubkey(&alice_keypair); - let bob_pubkey = get_pubkey(&bob_keypair); println!("Transferring 1 unit {} times...", txs); + let now = Instant::now(); + let mut sig = sig; for _ in 0..txs { - acc.transfer(1, &alice_keypair, bob_pubkey).unwrap(); + let bob_keypair = generate_keypair(); + let bob_pubkey = get_pubkey(&bob_keypair); + sig = acc.transfer(1, &alice_keypair, bob_pubkey).unwrap(); } - println!("Done."); + println!("Waiting for last transaction to be confirmed...",); + acc.wait_on_signature(&sig).unwrap(); - sleep(Duration::from_millis(20)); - let mut alice_val = acc.get_balance(&alice_pubkey).unwrap(); - while alice_val > 0 { - println!("Checking on Alice's Balance {}", alice_val); - sleep(Duration::from_millis(20)); - alice_val = acc.get_balance(&alice_pubkey).unwrap(); - } - println!("Done. Checking balances."); - println!( - "Alice's Final Balance {}", - acc.get_balance(&alice_pubkey).unwrap() - ); - - println!( - "Bob's Final Balance {}", - acc.get_balance(&bob_pubkey).unwrap() - ); + let duration = now.elapsed(); + let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; + let tps = (txs * 1_000_000_000) as f64 / ns as f64; + println!("Done. {} tps!", tps); + let val = acc.get_balance(&alice_pubkey).unwrap(); + println!("Alice's Final Balance {}", val); + assert_eq!(val, 0); } diff --git a/src/historian.rs b/src/historian.rs index 99e2ff6006..1606080ed1 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -116,8 +116,8 @@ pub fn create_logger( impl Historian { pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { use std::sync::mpsc::sync_channel; - let (sender, event_receiver) = sync_channel(4000); - let (entry_sender, receiver) = sync_channel(4000); + let (sender, event_receiver) = sync_channel(1000); + let (entry_sender, receiver) = sync_channel(1000); let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { sender,