From 8295cc11c0f46648d24e1e7cbb0fd885c09002c7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 20 Mar 2018 23:15:44 -0600 Subject: [PATCH 1/3] Move JSON printing up the stack --- src/accountant.rs | 35 ++++++++++++++++------------------- src/accountant_skel.rs | 17 ++++++++++++++--- src/historian.rs | 12 ++++-------- src/recorder.rs | 37 ++++++++++++++++++++----------------- 4 files changed, 54 insertions(+), 47 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index aaf206e34..7f2e65fa9 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -44,7 +44,6 @@ pub struct Accountant { pub historian: Historian, pub balances: HashMap, pub first_id: Hash, - pub last_id: Hash, pending: HashMap, time_sources: HashSet, last_time: DateTime, @@ -67,7 +66,6 @@ impl Accountant { historian: hist, balances: HashMap::new(), first_id: start_hash, - last_id: start_hash, pending: HashMap::new(), time_sources: HashSet::new(), last_time: Utc.timestamp(0, 0), @@ -91,13 +89,6 @@ impl Accountant { Self::new_from_entries(mint.create_entries(), ms_per_tick) } - pub fn sync(self: &mut Self) -> Hash { - while let Ok(entry) = self.historian.receiver.try_recv() { - self.last_id = entry.id; - } - self.last_id - } - fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool { if let Plan::Pay(ref payment) = *plan { allow_deposits && *from == payment.to @@ -210,8 +201,9 @@ impl Accountant { n: i64, keypair: &KeyPair, to: PublicKey, + last_id: Hash, ) -> Result { - let tr = Transaction::new(keypair, to, n, self.last_id); + let tr = Transaction::new(keypair, to, n, last_id); let sig = tr.sig; self.process_transaction(tr).map(|_| sig) } @@ -222,8 +214,9 @@ impl Accountant { keypair: &KeyPair, to: PublicKey, dt: DateTime, + last_id: Hash, ) -> Result { - let tr = Transaction::new_on_date(keypair, to, dt, n, self.last_id); + let tr = Transaction::new_on_date(keypair, to, dt, n, last_id); let sig = tr.sig; self.process_transaction(tr).map(|_| sig) } @@ -244,10 +237,12 @@ mod tests { let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); let mut acc = Accountant::new(&alice, Some(2)); - acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap(); + acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed()) + .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); - acc.transfer(500, &alice.keypair(), bob_pubkey).unwrap(); + acc.transfer(500, &alice.keypair(), bob_pubkey, alice.seed()) + .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); drop(acc.historian.sender); @@ -262,9 +257,10 @@ mod tests { let alice = Mint::new(11_000); let mut acc = Accountant::new(&alice, Some(2)); let bob_pubkey = KeyPair::new().pubkey(); - acc.transfer(1_000, &alice.keypair(), bob_pubkey).unwrap(); + acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed()) + .unwrap(); assert_eq!( - acc.transfer(10_001, &alice.keypair(), bob_pubkey), + acc.transfer(10_001, &alice.keypair(), bob_pubkey, alice.seed()), Err(AccountingError::InsufficientFunds) ); @@ -309,7 +305,8 @@ mod tests { let mut acc = Accountant::new(&alice, Some(2)); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); - acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + acc.transfer(500, &alice_keypair, bob_pubkey, alice.seed()) + .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); drop(acc.historian.sender); @@ -326,7 +323,7 @@ mod tests { let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt) + acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed()) .unwrap(); // Alice's balance will be zero because all funds are locked up. @@ -355,7 +352,7 @@ mod tests { acc.process_verified_timestamp(alice.pubkey(), dt).unwrap(); // It's now past now, so this transfer should be processed immediately. - acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt) + acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed()) .unwrap(); assert_eq!(acc.get_balance(&alice.pubkey()), Some(0)); @@ -369,7 +366,7 @@ mod tests { let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); - let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt) + let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.seed()) .unwrap(); // Alice's balance will be zero because all funds are locked up. diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 58729e612..2b114b66d 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -12,9 +12,11 @@ use std::time::Duration; use std::sync::mpsc::channel; use std::thread::{spawn, JoinHandle}; use std::default::Default; +use serde_json; pub struct AccountantSkel { pub acc: Accountant, + pub last_id: Hash, } #[derive(Serialize, Deserialize, Debug)] @@ -34,7 +36,16 @@ pub enum Response { impl AccountantSkel { pub fn new(acc: Accountant) -> Self { - AccountantSkel { acc } + let last_id = acc.first_id; + AccountantSkel { acc, last_id } + } + + pub fn sync(self: &mut Self) -> Hash { + while let Ok(entry) = self.acc.historian.receiver.try_recv() { + self.last_id = entry.id; + println!("{}", serde_json::to_string(&entry).unwrap()); + } + self.last_id } pub fn process_request(self: &mut Self, msg: Request) -> Option { @@ -52,8 +63,8 @@ impl AccountantSkel { Request::GetEntries { .. } => Some(Response::Entries { entries: vec![] }), Request::GetId { is_last } => Some(Response::Id { id: if is_last { - self.acc.sync(); - self.acc.last_id + self.sync(); + self.last_id } else { self.acc.first_id }, diff --git a/src/historian.rs b/src/historian.rs index 12965c313..a6a3b94de 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -5,7 +5,7 @@ use std::thread::{spawn, JoinHandle}; use std::collections::HashSet; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::time::Instant; -use hash::{hash, Hash}; +use hash::Hash; use entry::Entry; use recorder::{ExitReason, Recorder, Signal}; use signature::Signature; @@ -48,8 +48,7 @@ impl Historian { return err; } if ms_per_tick.is_some() { - recorder.last_id = hash(&recorder.last_id); - recorder.num_hashes += 1; + recorder.hash(); } } }) @@ -127,12 +126,9 @@ mod tests { hist.sender.send(Signal::Tick).unwrap(); drop(hist.sender); let entries: Vec = hist.receiver.iter().collect(); + assert!(entries.len() > 1); - // Ensure one entry is sent back for each tick sent in. - assert_eq!(entries.len(), 1); - - // Ensure the ID is not the seed, which indicates another Tick - // was recorded before the one we sent. + // Ensure the ID is not the seed. assert_ne!(entries[0].id, zero); } } diff --git a/src/recorder.rs b/src/recorder.rs index a90650d96..0a5a0bc14 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -8,10 +8,9 @@ use std::sync::mpsc::{Receiver, SyncSender, TryRecvError}; use std::time::{Duration, Instant}; use std::mem; -use hash::Hash; +use hash::{hash, Hash}; use entry::{create_entry_mut, Entry}; use event::Event; -use serde_json; pub enum Signal { Tick, @@ -25,12 +24,12 @@ pub enum ExitReason { } pub struct Recorder { - pub sender: SyncSender, - pub receiver: Receiver, - pub last_id: Hash, - pub events: Vec, - pub num_hashes: u64, - pub num_ticks: u64, + sender: SyncSender, + receiver: Receiver, + last_hash: Hash, + events: Vec, + num_hashes: u64, + num_ticks: u64, } impl Recorder { @@ -38,18 +37,25 @@ impl Recorder { Recorder { receiver, sender, - last_id: start_hash, + last_hash: start_hash, events: vec![], num_hashes: 0, num_ticks: 0, } } - pub fn record_entry(&mut self) -> Result { + pub fn hash(&mut self) { + self.last_hash = hash(&self.last_hash); + self.num_hashes += 1; + } + + pub fn record_entry(&mut self) -> Result<(), ExitReason> { let events = mem::replace(&mut self.events, vec![]); - let entry = create_entry_mut(&mut self.last_id, &mut self.num_hashes, events); - println!("{}", serde_json::to_string(&entry).unwrap()); - Ok(entry) + let entry = create_entry_mut(&mut self.last_hash, &mut self.num_hashes, events); + self.sender + .send(entry) + .or(Err(ExitReason::SendDisconnected))?; + Ok(()) } pub fn process_events( @@ -68,10 +74,7 @@ impl Recorder { match self.receiver.try_recv() { Ok(signal) => match signal { Signal::Tick => { - let entry = self.record_entry()?; - self.sender - .send(entry) - .or(Err(ExitReason::SendDisconnected))?; + self.record_entry()?; } Signal::Event(event) => { self.events.push(event); From 9f232bac5894f8a67e1ccdb624452d4f787fc13b Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 21 Mar 2018 15:43:39 -0600 Subject: [PATCH 2/3] Allow clients to sync the ledger Fixes #4 --- src/accountant_skel.rs | 23 +++++++++++++++++++---- src/accountant_stub.rs | 22 ++++++---------------- src/bin/client-demo.rs | 2 +- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 2b114b66d..885f05a2b 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -17,6 +17,7 @@ use serde_json; pub struct AccountantSkel { pub acc: Accountant, pub last_id: Hash, + pub ledger: Vec, } #[derive(Serialize, Deserialize, Debug)] @@ -37,13 +38,18 @@ pub enum Response { impl AccountantSkel { pub fn new(acc: Accountant) -> Self { let last_id = acc.first_id; - AccountantSkel { acc, last_id } + AccountantSkel { + acc, + last_id, + ledger: vec![], + } } pub fn sync(self: &mut Self) -> Hash { while let Ok(entry) = self.acc.historian.receiver.try_recv() { self.last_id = entry.id; println!("{}", serde_json::to_string(&entry).unwrap()); + self.ledger.push(entry); } self.last_id } @@ -60,11 +66,20 @@ impl AccountantSkel { let val = self.acc.get_balance(&key); Some(Response::Balance { key, val }) } - Request::GetEntries { .. } => Some(Response::Entries { entries: vec![] }), + Request::GetEntries { last_id } => { + self.sync(); + let entries = self.ledger + .iter() + .skip_while(|x| x.id != last_id) // log(n) way to find Entry with id == last_id. + .skip(1) // Skip the entry with last_id. + .take(256) // TODO: Take while the serialized entries fit into a 64k UDP packet. + .cloned() + .collect(); + Some(Response::Entries { entries }) + } Request::GetId { is_last } => Some(Response::Id { id: if is_last { - self.sync(); - self.last_id + self.sync() } else { self.acc.first_id }, diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index d02371677..e3db7e41c 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -14,7 +14,6 @@ use accountant_skel::{Request, Response}; pub struct AccountantStub { pub addr: String, pub socket: UdpSocket, - pub last_id: Option, } impl AccountantStub { @@ -22,7 +21,6 @@ impl AccountantStub { AccountantStub { addr: addr.to_string(), socket, - last_id: None, } } @@ -75,16 +73,8 @@ impl AccountantStub { self.get_id(true) } - pub fn wait_on_signature(&mut self, wait_sig: &Signature) -> io::Result<()> { - let last_id = match self.last_id { - None => { - let first_id = self.get_id(false)?; - self.last_id = Some(first_id); - first_id - } - Some(last_id) => last_id, - }; - + pub fn wait_on_signature(&mut self, wait_sig: &Signature, last_id: &Hash) -> io::Result { + let mut last_id = *last_id; let req = Request::GetEntries { last_id }; let data = serialize(&req).unwrap(); self.socket.send_to(&data, &self.addr).map(|_| ())?; @@ -94,11 +84,11 @@ impl AccountantStub { let resp = deserialize(&buf).expect("deserialize signature"); if let Response::Entries { entries } = resp { for Entry { id, events, .. } in entries { - self.last_id = Some(id); + last_id = id; for event in events { if let Some(sig) = event.get_signature() { if sig == *wait_sig { - return Ok(()); + return Ok(last_id); } } } @@ -106,7 +96,7 @@ impl AccountantStub { } // TODO: Loop until we found it. - Ok(()) + Ok(last_id) } } @@ -138,7 +128,7 @@ mod tests { let last_id = acc.get_last_id().unwrap(); let sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); - acc.wait_on_signature(&sig).unwrap(); + acc.wait_on_signature(&sig, &last_id).unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500); *exit.lock().unwrap() = true; for t in threads.iter() { diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 7a3f4b4fd..18f4e6556 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -65,7 +65,7 @@ fn main() { acc.transfer_signed(tr).unwrap(); } println!("Waiting for last transaction to be confirmed...",); - acc.wait_on_signature(&sig).unwrap(); + acc.wait_on_signature(&sig, &last_id).unwrap(); let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; From 8ea97141eab487224a96a46bd45bc36349c33281 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 21 Mar 2018 17:15:32 -0600 Subject: [PATCH 3/3] Update the test to replicate the ledger --- src/accountant_stub.rs | 33 +++++++++++++++++++++++++-------- src/streamer.rs | 2 +- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index e3db7e41c..0c4f1082c 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -73,29 +73,46 @@ impl AccountantStub { self.get_id(true) } - pub fn wait_on_signature(&mut self, wait_sig: &Signature, last_id: &Hash) -> io::Result { + pub fn check_on_signature( + &mut self, + wait_sig: &Signature, + last_id: &Hash, + ) -> io::Result<(bool, Hash)> { let mut last_id = *last_id; let req = Request::GetEntries { last_id }; let data = serialize(&req).unwrap(); self.socket.send_to(&data, &self.addr).map(|_| ())?; - let mut buf = vec![0u8; 1024]; + let mut buf = vec![0u8; 65_535]; self.socket.recv_from(&mut buf)?; let resp = deserialize(&buf).expect("deserialize signature"); + let mut found = false; if let Response::Entries { entries } = resp { for Entry { id, events, .. } in entries { last_id = id; - for event in events { - if let Some(sig) = event.get_signature() { - if sig == *wait_sig { - return Ok(last_id); + if !found { + for event in events { + if let Some(sig) = event.get_signature() { + if sig == *wait_sig { + found = true; + } } } } } } - // TODO: Loop until we found it. + Ok((found, last_id)) + } + + pub fn wait_on_signature(&mut self, wait_sig: &Signature, last_id: &Hash) -> io::Result { + let mut found = false; + let mut last_id = *last_id; + while !found { + let ret = self.check_on_signature(wait_sig, &last_id)?; + found = ret.0; + last_id = ret.1; + } Ok(last_id) } } @@ -116,7 +133,7 @@ mod tests { let addr = "127.0.0.1:9000"; let send_addr = "127.0.0.1:9001"; let alice = Mint::new(10_000); - let acc = Accountant::new(&alice, None); + let acc = Accountant::new(&alice, Some(30)); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(Mutex::new(false)); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc))); diff --git a/src/streamer.rs b/src/streamer.rs index c1087db94..56e5d2f90 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -7,7 +7,7 @@ use std::thread::{spawn, JoinHandle}; use result::{Error, Result}; const BLOCK_SIZE: usize = 1024 * 8; -pub const PACKET_SIZE: usize = 256; +pub const PACKET_SIZE: usize = 1024; #[derive(Clone)] pub struct Packet {