diff --git a/Cargo.toml b/Cargo.toml index 23f262848..83ac0e595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "silk" description = "A silky smooth implementation of the Loom architecture" -version = "0.2.3" +version = "0.3.0" documentation = "https://docs.rs/silk" homepage = "http://loomprotocol.com/" repository = "https://github.com/loomprotocol/silk" diff --git a/src/accountant.rs b/src/accountant.rs index 968ae6d3d..2892c3973 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -25,7 +25,6 @@ impl Accountant { } pub fn process_event(self: &mut Self, event: &Event) { - println!("accountant: Processing event: {:?}", event); match *event { Event::Claim { key, data, .. } => { if self.balances.contains_key(&key) { @@ -55,7 +54,6 @@ impl Accountant { pub fn sync(self: &mut Self) { let mut entries = vec![]; while let Ok(entry) = self.historian.receiver.try_recv() { - println!("accountant: got event {:?}", entry.event); entries.push(entry); } // TODO: Does this cause the historian's channel to get blocked? @@ -99,20 +97,17 @@ impl Accountant { data: u64, sig: Signature, ) -> Result<(), SendError>> { - println!("accountant: Checking funds (needs sync)..."); if self.get_balance(&from).unwrap() < data { // TODO: Replace the SendError result with a custom one. println!("Error: Insufficient funds"); return Ok(()); } - println!("accountant: Sufficient funds."); let event = Event::Transaction { from, to, data, sig, }; - println!("accountant: Sending Transaction to historian."); self.historian.sender.send(event) } @@ -130,9 +125,7 @@ impl Accountant { } pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result { - println!("accountant: syncing the log..."); self.sync(); - println!("accountant: done syncing."); Ok(*self.balances.get(pubkey).unwrap_or(&0)) } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 7d5c39bee..18419896c 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -42,9 +42,7 @@ impl AccountantSkel { None } Request::Transfer { from, to, val, sig } => { - println!("skel: Invoking transfer_signed..."); let _ = self.obj.transfer_signed(from, to, val, sig); - println!("skel: transfer_signed done."); None } Request::GetBalance { key } => { @@ -62,21 +60,16 @@ impl AccountantSkel { let listener = TcpListener::bind(addr)?; let mut buf = vec![0u8; 1024]; loop { - println!("skel: Waiting for incoming connections..."); - let (mut stream, addr) = listener.accept()?; - println!("skel: Accepted incoming connection frm {:?}.", addr); + //println!("skel: Waiting for incoming connections..."); + let (mut stream, _from_addr) = listener.accept()?; let _sz = stream.read(&mut buf)?; // TODO: Return a descriptive error message if deserialization fails. let req = deserialize(&buf).expect("deserialize request"); - println!("skel: Got request {:?}", req); - println!("skel: Processing request..."); if let Some(resp) = self.process_request(req) { - println!("skel: Writing response..."); stream.write(&serialize(&resp).expect("serialize response"))?; } - println!("skel: Done processing request."); } } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 10568eee6..5c5749b1b 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -49,13 +49,8 @@ impl AccountantStub { ) -> io::Result { let req = Request::Transfer { from, to, val, sig }; let data = serialize(&req).unwrap(); - println!("TcpStream::connect()..."); let mut stream = TcpStream::connect(&self.addr)?; - println!("Connected."); - println!("accountant_stub: Writing transfer message..."); - let ret = stream.write(&data); - println!("Done."); - ret + stream.write(&data) } pub fn transfer( diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index fd8b6be2d..e2ea5e56b 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,18 +10,36 @@ fn main() { let mut acc = AccountantStub::new(addr); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); - println!("Depositing..."); - acc.deposit(10_000, &alice_keypair).unwrap(); - acc.deposit(1_000, &bob_keypair).unwrap(); + let txs = 10_000; + println!("Depositing {} units in Alice's account...", txs); + acc.deposit(txs, &alice_keypair).unwrap(); + //acc.deposit(1_000, &bob_keypair).unwrap(); println!("Done."); sleep(Duration::from_millis(30)); + let alice_pubkey = get_pubkey(&alice_keypair); let bob_pubkey = get_pubkey(&bob_keypair); - println!("Transferring..."); - acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + println!("Transferring 1 unit {} times...", txs); + for _ in 0..txs { + acc.transfer(1, &alice_keypair, bob_pubkey).unwrap(); + } println!("Done."); - sleep(Duration::from_millis(30)); - println!("Done. Checking balance."); - println!("Balance {}", acc.get_balance(&bob_pubkey).unwrap()); + sleep(Duration::from_millis(20)); + let mut alice_val = acc.get_balance(&alice_pubkey).unwrap(); + while alice_val > 0 { + println!("Checking on Alice's Balance {}", alice_val); + sleep(Duration::from_millis(20)); + alice_val = acc.get_balance(&alice_pubkey).unwrap(); + } + println!("Done. Checking balances."); + println!( + "Alice's Final Balance {}", + acc.get_balance(&alice_pubkey).unwrap() + ); + + println!( + "Bob's Final Balance {}", + acc.get_balance(&bob_pubkey).unwrap() + ); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 8e4adb9f7..4231eda35 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -9,5 +9,6 @@ fn main() { let zero = Sha256Hash::default(); let acc = Accountant::new(&zero, Some(1000)); let mut skel = AccountantSkel::new(acc); + println!("Listening on {}", addr); skel.serve(addr).unwrap(); } diff --git a/src/historian.rs b/src/historian.rs index 0ed4fcbe7..99e2ff600 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -6,14 +6,14 @@ //! The resulting stream of entries represents ordered events in time. use std::thread::JoinHandle; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, SyncSender}; use std::time::{Duration, SystemTime}; use log::{hash, hash_event, verify_event, Entry, Event, Sha256Hash}; use serde::Serialize; use std::fmt::Debug; pub struct Historian { - pub sender: Sender>, + pub sender: SyncSender>, pub receiver: Receiver>, pub thread_hdl: JoinHandle<(Entry, ExitReason)>, } @@ -24,13 +24,12 @@ pub enum ExitReason { SendDisconnected, } fn log_event( - sender: &Sender>, + sender: &SyncSender>, num_hashes: &mut u64, end_hash: &mut Sha256Hash, event: Event, ) -> Result<(), (Entry, ExitReason)> { *end_hash = hash_event(end_hash, &event); - println!("historian: logging event {:?}", event); let entry = Entry { end_hash: *end_hash, num_hashes: *num_hashes, @@ -45,7 +44,7 @@ fn log_event( fn log_events( receiver: &Receiver>, - sender: &Sender>, + sender: &SyncSender>, num_hashes: &mut u64, end_hash: &mut Sha256Hash, epoch: SystemTime, @@ -88,7 +87,7 @@ pub fn create_logger( start_hash: Sha256Hash, ms_per_tick: Option, receiver: Receiver>, - sender: Sender>, + sender: SyncSender>, ) -> JoinHandle<(Entry, ExitReason)> { use std::thread; thread::spawn(move || { @@ -116,9 +115,9 @@ pub fn create_logger( impl Historian { pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { - use std::sync::mpsc::channel; - let (sender, event_receiver) = channel(); - let (entry_sender, receiver) = channel(); + use std::sync::mpsc::sync_channel; + let (sender, event_receiver) = sync_channel(4000); + let (entry_sender, receiver) = sync_channel(4000); let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { sender,