From 604ccf755277f4467a3c82bef1e2ecc1c22a13ee Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 28 Feb 2018 10:07:54 -0700 Subject: [PATCH 1/4] Add network interface for accountant --- Cargo.toml | 4 ++ src/accountant.rs | 65 ++++++++++++++++------- src/accountant_skel.rs | 72 ++++++++++++++++++++++++++ src/bin/client-demo.rs | 84 ++++++++++++++++++++++++++++++ src/historian.rs | 10 ++-- src/lib.rs | 1 + src/log.rs | 115 +++++++++++++++++++---------------------- 7 files changed, 265 insertions(+), 86 deletions(-) create mode 100644 src/accountant_skel.rs create mode 100644 src/bin/client-demo.rs diff --git a/Cargo.toml b/Cargo.toml index 9523e3f6e..c5bd74829 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,10 @@ license = "Apache-2.0" name = "silk-demo" path = "src/bin/demo.rs" +[[bin]] +name = "silk-client-demo" +path = "src/bin/client-demo.rs" + [badges] codecov = { repository = "loomprotocol/silk", branch = "master", service = "github" } diff --git a/src/accountant.rs b/src/accountant.rs index 857cc8ad2..0e850adbc 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -2,7 +2,7 @@ //! event log to record transactions. Its users can deposit funds and //! transfer funds to other users. -use log::{verify_entry, Event, PublicKey, Sha256Hash}; +use log::{verify_entry, Event, PublicKey, Sha256Hash, Signature}; use historian::Historian; use ring::signature::Ed25519KeyPair; use std::sync::mpsc::{RecvError, SendError}; @@ -60,13 +60,43 @@ impl Accountant { } } + pub fn deposit_signed( + self: &Self, + key: PublicKey, + data: u64, + sig: Signature, + ) -> Result<(), SendError>> { + let event = Event::Claim { key, data, sig }; + self.historian.sender.send(event) + } + pub fn deposit( self: &Self, n: u64, keypair: &Ed25519KeyPair, ) -> Result<(), SendError>> { - use log::sign_hash; - let event = sign_hash(n, &keypair); + use log::{get_pubkey, sign_serialized}; + let key = get_pubkey(keypair); + let sig = sign_serialized(&n, keypair); + self.deposit_signed(key, n, sig) + } + + pub fn transfer_signed( + self: &mut Self, + from: PublicKey, + to: PublicKey, + data: u64, + sig: Signature, + ) -> Result<(), SendError>> { + if self.get_balance(&from).unwrap() < data { + return Ok(()); + } + let event = Event::Transaction { + from, + to, + data, + sig, + }; self.historian.sender.send(event) } @@ -74,17 +104,13 @@ impl Accountant { self: &mut Self, n: u64, keypair: &Ed25519KeyPair, - pubkey: PublicKey, + to: PublicKey, ) -> Result<(), SendError>> { - use log::transfer_hash; - use generic_array::GenericArray; + use log::{get_pubkey, sign_transaction_data}; - let sender_pubkey = GenericArray::clone_from_slice(keypair.public_key_bytes()); - if self.get_balance(&sender_pubkey).unwrap() >= n { - let event = transfer_hash(n, keypair, pubkey); - return self.historian.sender.send(event); - } - Ok(()) + let from = get_pubkey(keypair); + let sig = sign_transaction_data(&n, keypair, &to); + self.transfer_signed(from, to, n, sig) } pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result { @@ -98,9 +124,8 @@ mod tests { use super::*; use std::thread::sleep; use std::time::Duration; - use log::generate_keypair; + use log::{generate_keypair, get_pubkey}; use historian::ExitReason; - use generic_array::GenericArray; #[test] fn test_accountant() { @@ -112,7 +137,7 @@ mod tests { acc.deposit(1_000, &bob_keypair).unwrap(); sleep(Duration::from_millis(30)); - let bob_pubkey = GenericArray::clone_from_slice(bob_keypair.public_key_bytes()); + let bob_pubkey = get_pubkey(&bob_keypair); acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); sleep(Duration::from_millis(30)); @@ -135,11 +160,11 @@ mod tests { acc.deposit(1_000, &bob_keypair).unwrap(); sleep(Duration::from_millis(30)); - let bob_pubkey = GenericArray::clone_from_slice(bob_keypair.public_key_bytes()); + let bob_pubkey = get_pubkey(&bob_keypair); acc.transfer(10_001, &alice_keypair, bob_pubkey).unwrap(); sleep(Duration::from_millis(30)); - let alice_pubkey = GenericArray::clone_from_slice(alice_keypair.public_key_bytes()); + let alice_pubkey = get_pubkey(&alice_keypair); assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); @@ -151,14 +176,14 @@ mod tests { } #[test] - fn test_mulitple_claims() { + fn test_multiple_claims() { let zero = Sha256Hash::default(); let mut acc = Accountant::new(&zero, Some(2)); let keypair = generate_keypair(); acc.deposit(1, &keypair).unwrap(); acc.deposit(2, &keypair).unwrap(); - let pubkey = GenericArray::clone_from_slice(keypair.public_key_bytes()); + let pubkey = get_pubkey(&keypair); sleep(Duration::from_millis(30)); assert_eq!(acc.get_balance(&pubkey).unwrap(), 3); @@ -178,7 +203,7 @@ mod tests { acc.deposit(10_000, &alice_keypair).unwrap(); sleep(Duration::from_millis(30)); - let bob_pubkey = GenericArray::clone_from_slice(bob_keypair.public_key_bytes()); + let bob_pubkey = get_pubkey(&bob_keypair); acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); sleep(Duration::from_millis(30)); diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs new file mode 100644 index 000000000..60cb36508 --- /dev/null +++ b/src/accountant_skel.rs @@ -0,0 +1,72 @@ +use std::io; +use accountant::Accountant; +use log::{PublicKey, Signature}; +//use serde::Serialize; + +pub struct AccountantSkel { + pub obj: Accountant, +} + +#[derive(Serialize, Deserialize)] +pub enum Request { + Deposit { + key: PublicKey, + val: u64, + sig: Signature, + }, + Transfer { + from: PublicKey, + to: PublicKey, + val: u64, + sig: Signature, + }, + GetBalance { + key: PublicKey, + }, +} + +#[derive(Serialize, Deserialize)] +pub enum Response { + Balance { key: PublicKey, val: u64 }, +} + +impl AccountantSkel { + pub fn process_message(self: &mut Self, msg: Request) -> Option { + match msg { + Request::Deposit { key, val, sig } => { + let _ = self.obj.deposit_signed(key, val, sig); + None + } + Request::Transfer { from, to, val, sig } => { + let _ = self.obj.transfer_signed(from, to, val, sig); + None + } + Request::GetBalance { key } => { + let val = self.obj.get_balance(&key).unwrap(); + Some(Response::Balance { key, val }) + } + } + } + + /// TCP Server that forwards messages to Accountant methods. + pub fn serve(self: &mut Self, addr: &str) -> io::Result<()> { + use std::net::TcpListener; + use std::io::{Read, Write}; + use bincode::{deserialize, serialize}; + let listener = TcpListener::bind(addr)?; + let mut buf = vec![]; + loop { + let (mut stream, addr) = listener.accept()?; + println!("connection received from {}", addr); + + // TODO: Guard against large message DoS attack. + stream.read_to_end(&mut buf)?; + + // TODO: Return a descriptive error message if deserialization fails. + let msg = deserialize(&buf).unwrap(); + if let Some(resp) = self.process_message(msg) { + stream.write(&serialize(&resp).unwrap())?; + } + } + } +} diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs new file mode 100644 index 000000000..003caf6e2 --- /dev/null +++ b/src/bin/client-demo.rs @@ -0,0 +1,84 @@ +extern crate generic_array; +extern crate silk; + +//use log::{Event, PublicKey, Sha256Hash}; +//use std::net::TcpStream; +//use ring::signature::Ed25519KeyPair; +// +//pub struct AccountantStub { +// pub stream: TcpStream, +//} +// +//impl AccountantStub { +// pub fn new(addr: ()) -> Self { +// let mut stream = TcpStream::connect(addr).unwrap(); +// AccountantStub { +// stream: TcpString, +// } +// } +// +// pub fn deposit( +// self: &Self, +// n: u64, +// keypair: &Ed25519KeyPair, +// ) -> Result<(), SendError>> { +// use log::sign_hash; +// let event = sign_hash(n, &keypair); +// self.stream.send(&serialize(event)) +// } +// +// pub fn transfer( +// self: &mut Self, +// n: u64, +// keypair: &Ed25519KeyPair, +// pubkey: PublicKey, +// ) -> io::Result<()> { +// use log::transfer_hash; +// use generic_array::GenericArray; +// let event = transfer_hash(n, &keypair); +// self.stream.send(&serialize(event)) +// } +// +// pub fn get_balance( +// self: &mut Self, +// pubkey: PublicKey, +// ) -> io::Result<()> { +// let event = GetBalance { key: pubkey }; +// self.stream.send(&serialize(event)); +// msg = deserialize(self.sender.recv()); +// if let AccountantMsg::Balance { val } = msg { +// Ok(val) +// } else { +// Err() +// } +// } +//} + +use silk::accountant::Accountant; +use std::thread::sleep; +use std::time::Duration; +use silk::log::{generate_keypair, Sha256Hash}; +use silk::historian::ExitReason; +use generic_array::GenericArray; + +fn main() { + let zero = Sha256Hash::default(); + let mut acc = Accountant::new(&zero, Some(2)); + let alice_keypair = generate_keypair(); + let bob_keypair = generate_keypair(); + acc.deposit(10_000, &alice_keypair).unwrap(); + acc.deposit(1_000, &bob_keypair).unwrap(); + + sleep(Duration::from_millis(30)); + let bob_pubkey = GenericArray::clone_from_slice(bob_keypair.public_key_bytes()); + acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + + sleep(Duration::from_millis(30)); + assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); + + drop(acc.historian.sender); + assert_eq!( + acc.historian.thread_hdl.join().unwrap().1, + ExitReason::RecvDisconnected + ); +} diff --git a/src/historian.rs b/src/historian.rs index 0daba33fb..80df3ba6f 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -192,11 +192,11 @@ mod tests { let zero = Sha256Hash::default(); let hist = Historian::new(&zero, None); let keypair = generate_keypair(); - let mut event0 = sign_hash(hash(b"hello, world"), &keypair); - if let Event::Claim { key, sig, .. } = event0 { - let data = hash(b"goodbye cruel world"); - event0 = Event::Claim { key, data, sig }; - } + let event0 = Event::Claim { + key: get_pubkey(&keypair), + data: hash(b"goodbye cruel world"), + sig: sign_serialized(&hash(b"hello, world"), &keypair), + }; hist.sender.send(event0).unwrap(); drop(hist.sender); assert_eq!( diff --git a/src/lib.rs b/src/lib.rs index 5aef0ab23..c30052ad9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod log; pub mod historian; pub mod accountant; +pub mod accountant_skel; extern crate bincode; extern crate generic_array; extern crate rayon; diff --git a/src/log.rs b/src/log.rs index 00df286cc..a0e6b739c 100644 --- a/src/log.rs +++ b/src/log.rs @@ -65,7 +65,7 @@ impl Entry { } } -// Return a new ED25519 keypair +/// Return a new ED25519 keypair pub fn generate_keypair() -> Ed25519KeyPair { use ring::{rand, signature}; use untrusted; @@ -74,33 +74,25 @@ pub fn generate_keypair() -> Ed25519KeyPair { signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).unwrap() } -/// Return a Claim Event for the given hash and key-pair. -pub fn sign_hash(data: T, keypair: &Ed25519KeyPair) -> Event { - use bincode::serialize; - let sig = keypair.sign(&serialize(&data).unwrap()); - let peer_public_key_bytes = keypair.public_key_bytes(); - let sig_bytes = sig.as_ref(); - Event::Claim { - key: GenericArray::clone_from_slice(peer_public_key_bytes), - data, - sig: GenericArray::clone_from_slice(sig_bytes), - } +/// Return the public key for the given keypair +pub fn get_pubkey(keypair: &Ed25519KeyPair) -> PublicKey { + GenericArray::clone_from_slice(keypair.public_key_bytes()) } -/// Return a Transaction Event that indicates a transfer in ownership of the given hash. -pub fn transfer_hash(data: T, keypair: &Ed25519KeyPair, to: PublicKey) -> Event { +/// Return a signature for the given data using the private key from the given keypair. +pub fn sign_serialized(data: &T, keypair: &Ed25519KeyPair) -> Signature { use bincode::serialize; - let from_public_key_bytes = keypair.public_key_bytes(); - let mut sign_data = serialize(&data).unwrap(); - sign_data.extend_from_slice(&to); - let sig = keypair.sign(&sign_data); - let sig_bytes = sig.as_ref(); - Event::Transaction { - from: GenericArray::clone_from_slice(from_public_key_bytes), - to, - data, - sig: GenericArray::clone_from_slice(sig_bytes), - } + let serialized = serialize(data).unwrap(); + GenericArray::clone_from_slice(keypair.sign(&serialized).as_ref()) +} + +/// Return a signature for the given transaction data using the private key from the given keypair. +pub fn sign_transaction_data( + data: &T, + keypair: &Ed25519KeyPair, + to: &PublicKey, +) -> Signature { + sign_serialized(&(data, to), keypair) } /// Return a Sha256 hash for the given data. @@ -202,8 +194,7 @@ pub fn verify_event(event: &Event) -> bool { sig, } = *event { - let mut sign_data = serialize(&data).unwrap(); - sign_data.extend_from_slice(&to); + let sign_data = serialize(&(&data, &to)).unwrap(); if !verify_signature(&from, &sign_data, &sig) { return false; } @@ -338,7 +329,12 @@ mod tests { #[test] fn test_claim() { let keypair = generate_keypair(); - let event0 = sign_hash(hash(b"hello, world"), &keypair); + let data = hash(b"hello, world"); + let event0 = Event::Claim { + key: get_pubkey(&keypair), + data, + sig: sign_serialized(&data, &keypair), + }; let zero = Sha256Hash::default(); let entries = create_entries(&zero, 0, vec![event0]); assert!(verify_slice(&entries, &zero)); @@ -347,11 +343,11 @@ mod tests { #[test] fn test_wrong_data_claim_attack() { let keypair = generate_keypair(); - let mut event0 = sign_hash(hash(b"hello, world"), &keypair); - if let Event::Claim { key, sig, .. } = event0 { - let data = hash(b"goodbye cruel world"); - event0 = Event::Claim { key, data, sig }; - } + let event0 = Event::Claim { + key: get_pubkey(&keypair), + data: hash(b"goodbye cruel world"), + sig: sign_serialized(&hash(b"hello, world"), &keypair), + }; let zero = Sha256Hash::default(); let entries = create_entries(&zero, 0, vec![event0]); assert!(!verify_slice(&entries, &zero)); @@ -361,8 +357,14 @@ mod tests { fn test_transfer() { let keypair0 = generate_keypair(); let keypair1 = generate_keypair(); - let pubkey1 = GenericArray::clone_from_slice(keypair1.public_key_bytes()); - let event0 = transfer_hash(hash(b"hello, world"), &keypair0, pubkey1); + let pubkey1 = get_pubkey(&keypair1); + let data = hash(b"hello, world"); + let event0 = Event::Transaction { + from: get_pubkey(&keypair0), + to: pubkey1, + data, + sig: sign_transaction_data(&data, &keypair0, &pubkey1), + }; let zero = Sha256Hash::default(); let entries = create_entries(&zero, 0, vec![event0]); assert!(verify_slice(&entries, &zero)); @@ -372,17 +374,14 @@ mod tests { fn test_wrong_data_transfer_attack() { let keypair0 = generate_keypair(); let keypair1 = generate_keypair(); - let pubkey1 = GenericArray::clone_from_slice(keypair1.public_key_bytes()); - let mut event0 = transfer_hash(hash(b"hello, world"), &keypair0, pubkey1); - if let Event::Transaction { from, to, sig, .. } = event0 { - let data = hash(b"goodbye cruel world"); - event0 = Event::Transaction { - from, - to, - data, - sig, - }; - } + let pubkey1 = get_pubkey(&keypair1); + let data = hash(b"hello, world"); + let event0 = Event::Transaction { + from: get_pubkey(&keypair0), + to: pubkey1, + data: hash(b"goodbye cruel world"), // <-- attack! + sig: sign_transaction_data(&data, &keypair0, &pubkey1), + }; let zero = Sha256Hash::default(); let entries = create_entries(&zero, 0, vec![event0]); assert!(!verify_slice(&entries, &zero)); @@ -392,21 +391,15 @@ mod tests { fn test_transfer_hijack_attack() { let keypair0 = generate_keypair(); let keypair1 = generate_keypair(); - let pubkey1 = GenericArray::clone_from_slice(keypair1.public_key_bytes()); - let mut event0 = transfer_hash(hash(b"hello, world"), &keypair0, pubkey1); - if let Event::Transaction { - from, data, sig, .. - } = event0 - { - let theif_keypair = generate_keypair(); - let to = GenericArray::clone_from_slice(theif_keypair.public_key_bytes()); - event0 = Event::Transaction { - from, - to, - data, - sig, - }; - } + let thief_keypair = generate_keypair(); + let pubkey1 = get_pubkey(&keypair1); + let data = hash(b"hello, world"); + let event0 = Event::Transaction { + from: get_pubkey(&keypair0), + to: get_pubkey(&thief_keypair), // <-- attack! + data: hash(b"goodbye cruel world"), + sig: sign_transaction_data(&data, &keypair0, &pubkey1), + }; let zero = Sha256Hash::default(); let entries = create_entries(&zero, 0, vec![event0]); assert!(!verify_slice(&entries, &zero)); From 8299bae2d425b0dd44f5c860e7f49d76ff8ff41b Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 28 Feb 2018 14:16:50 -0700 Subject: [PATCH 2/4] Add accountant stub --- src/accountant.rs | 1 + src/accountant_skel.rs | 22 ++++---- src/accountant_stub.rs | 116 +++++++++++++++++++++++++++++++++++++++++ src/bin/client-demo.rs | 80 ++++------------------------ src/lib.rs | 1 + 5 files changed, 140 insertions(+), 80 deletions(-) create mode 100644 src/accountant_stub.rs diff --git a/src/accountant.rs b/src/accountant.rs index 0e850adbc..6596e7af2 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -89,6 +89,7 @@ impl Accountant { sig: Signature, ) -> Result<(), SendError>> { if self.get_balance(&from).unwrap() < data { + // TODO: Replace the SendError result with a custom one. return Ok(()); } let event = Event::Transaction { diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 60cb36508..008b0e7d4 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -7,7 +7,7 @@ pub struct AccountantSkel { pub obj: Accountant, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub enum Request { Deposit { key: PublicKey, @@ -25,12 +25,16 @@ pub enum Request { }, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub enum Response { Balance { key: PublicKey, val: u64 }, } impl AccountantSkel { + pub fn new(obj: Accountant) -> Self { + AccountantSkel { obj } + } + pub fn process_message(self: &mut Self, msg: Request) -> Option { match msg { Request::Deposit { key, val, sig } => { @@ -54,18 +58,18 @@ impl AccountantSkel { use std::io::{Read, Write}; use bincode::{deserialize, serialize}; let listener = TcpListener::bind(addr)?; - let mut buf = vec![]; + let mut buf = vec![0u8; 1024]; loop { - let (mut stream, addr) = listener.accept()?; - println!("connection received from {}", addr); + let (mut stream, _addr) = listener.accept()?; // TODO: Guard against large message DoS attack. - stream.read_to_end(&mut buf)?; + let _sz = stream.read(&mut buf)?; // TODO: Return a descriptive error message if deserialization fails. - let msg = deserialize(&buf).unwrap(); - if let Some(resp) = self.process_message(msg) { - stream.write(&serialize(&resp).unwrap())?; + let req = deserialize(&buf).expect("deserialize request"); + + if let Some(resp) = self.process_message(req) { + stream.write(&serialize(&resp).expect("serialize response"))?; } } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs new file mode 100644 index 000000000..5c5749b1b --- /dev/null +++ b/src/accountant_stub.rs @@ -0,0 +1,116 @@ +//! The `accountant` is a client of the `historian`. It uses the historian's +//! event log to record transactions. Its users can deposit funds and +//! transfer funds to other users. + +use std::net::TcpStream; +use std::io; +use std::io::{Read, Write}; +use bincode::{deserialize, serialize}; +use log::{PublicKey, Signature}; +use ring::signature::Ed25519KeyPair; +use accountant_skel::{Request, Response}; + +pub struct AccountantStub { + pub addr: String, +} + +impl AccountantStub { + pub fn new(addr: &str) -> Self { + AccountantStub { + addr: addr.to_string(), + } + } + + pub fn deposit_signed( + self: &mut Self, + key: PublicKey, + val: u64, + sig: Signature, + ) -> io::Result { + let req = Request::Deposit { key, val, sig }; + let data = serialize(&req).unwrap(); + let mut stream = TcpStream::connect(&self.addr)?; + stream.write(&data) + } + + pub fn deposit(self: &mut Self, n: u64, keypair: &Ed25519KeyPair) -> io::Result { + use log::{get_pubkey, sign_serialized}; + let key = get_pubkey(keypair); + let sig = sign_serialized(&n, keypair); + self.deposit_signed(key, n, sig) + } + + pub fn transfer_signed( + self: &mut Self, + from: PublicKey, + to: PublicKey, + val: u64, + sig: Signature, + ) -> io::Result { + let req = Request::Transfer { from, to, val, sig }; + let data = serialize(&req).unwrap(); + let mut stream = TcpStream::connect(&self.addr)?; + stream.write(&data) + } + + pub fn transfer( + self: &mut Self, + n: u64, + keypair: &Ed25519KeyPair, + to: PublicKey, + ) -> io::Result { + use log::{get_pubkey, sign_transaction_data}; + let from = get_pubkey(keypair); + let sig = sign_transaction_data(&n, keypair, &to); + self.transfer_signed(from, to, n, sig) + } + + pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> io::Result { + let mut stream = TcpStream::connect(&self.addr)?; + let req = Request::GetBalance { key: *pubkey }; + let data = serialize(&req).expect("serialize GetBalance"); + stream.write(&data)?; + let mut buf = vec![0u8; 1024]; + stream.read(&mut buf)?; + let resp = deserialize(&buf).expect("deserialize balance"); + let Response::Balance { key, val } = resp; + assert_eq!(key, *pubkey); + Ok(val) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use accountant::Accountant; + use accountant_skel::AccountantSkel; + use std::thread::{sleep, spawn}; + use std::time::Duration; + use log::{generate_keypair, get_pubkey, Sha256Hash}; + + #[test] + fn test_accountant_stub() { + let addr = "127.0.0.1:8000"; + spawn(move || { + let zero = Sha256Hash::default(); + let acc = Accountant::new(&zero, None); + let mut skel = AccountantSkel::new(acc); + skel.serve(addr).unwrap(); + }); + + sleep(Duration::from_millis(30)); + + let mut acc = AccountantStub::new(addr); + let alice_keypair = generate_keypair(); + let bob_keypair = generate_keypair(); + acc.deposit(10_000, &alice_keypair).unwrap(); + acc.deposit(1_000, &bob_keypair).unwrap(); + + sleep(Duration::from_millis(30)); + let bob_pubkey = get_pubkey(&bob_keypair); + acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + + sleep(Duration::from_millis(300)); + assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); + } +} diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 003caf6e2..209893f4a 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -1,84 +1,22 @@ -extern crate generic_array; extern crate silk; -//use log::{Event, PublicKey, Sha256Hash}; -//use std::net::TcpStream; -//use ring::signature::Ed25519KeyPair; -// -//pub struct AccountantStub { -// pub stream: TcpStream, -//} -// -//impl AccountantStub { -// pub fn new(addr: ()) -> Self { -// let mut stream = TcpStream::connect(addr).unwrap(); -// AccountantStub { -// stream: TcpString, -// } -// } -// -// pub fn deposit( -// self: &Self, -// n: u64, -// keypair: &Ed25519KeyPair, -// ) -> Result<(), SendError>> { -// use log::sign_hash; -// let event = sign_hash(n, &keypair); -// self.stream.send(&serialize(event)) -// } -// -// pub fn transfer( -// self: &mut Self, -// n: u64, -// keypair: &Ed25519KeyPair, -// pubkey: PublicKey, -// ) -> io::Result<()> { -// use log::transfer_hash; -// use generic_array::GenericArray; -// let event = transfer_hash(n, &keypair); -// self.stream.send(&serialize(event)) -// } -// -// pub fn get_balance( -// self: &mut Self, -// pubkey: PublicKey, -// ) -> io::Result<()> { -// let event = GetBalance { key: pubkey }; -// self.stream.send(&serialize(event)); -// msg = deserialize(self.sender.recv()); -// if let AccountantMsg::Balance { val } = msg { -// Ok(val) -// } else { -// Err() -// } -// } -//} - -use silk::accountant::Accountant; -use std::thread::sleep; -use std::time::Duration; -use silk::log::{generate_keypair, Sha256Hash}; -use silk::historian::ExitReason; -use generic_array::GenericArray; - fn main() { - let zero = Sha256Hash::default(); - let mut acc = Accountant::new(&zero, Some(2)); + use silk::accountant_stub::AccountantStub; + use std::thread::sleep; + use std::time::Duration; + use silk::log::{generate_keypair, get_pubkey}; + + let addr = "127.0.0.1:8000"; + let mut acc = AccountantStub::new(addr); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); acc.deposit(10_000, &alice_keypair).unwrap(); acc.deposit(1_000, &bob_keypair).unwrap(); sleep(Duration::from_millis(30)); - let bob_pubkey = GenericArray::clone_from_slice(bob_keypair.public_key_bytes()); + let bob_pubkey = get_pubkey(&bob_keypair); acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); - sleep(Duration::from_millis(30)); + sleep(Duration::from_millis(300)); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); - - drop(acc.historian.sender); - assert_eq!( - acc.historian.thread_hdl.join().unwrap().1, - ExitReason::RecvDisconnected - ); } diff --git a/src/lib.rs b/src/lib.rs index c30052ad9..6deb92941 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod log; pub mod historian; pub mod accountant; pub mod accountant_skel; +pub mod accountant_stub; extern crate bincode; extern crate generic_array; extern crate rayon; From 3fcc2dd944860f868a107c27bba66749ff0262b2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 28 Feb 2018 18:04:35 -0700 Subject: [PATCH 3/4] Add testnode Fixes #20 --- Cargo.toml | 4 ++++ src/accountant.rs | 31 ++++++++++++++++++++++++------- src/accountant_skel.rs | 16 +++++++++++----- src/accountant_stub.rs | 7 ++++++- src/bin/client-demo.rs | 9 +++++++-- src/bin/testnode.rs | 13 +++++++++++++ src/historian.rs | 10 ++++++---- src/log.rs | 8 ++++++++ 8 files changed, 79 insertions(+), 19 deletions(-) create mode 100644 src/bin/testnode.rs diff --git a/Cargo.toml b/Cargo.toml index c5bd74829..23f262848 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,10 @@ path = "src/bin/demo.rs" name = "silk-client-demo" path = "src/bin/client-demo.rs" +[[bin]] +name = "silk-testnode" +path = "src/bin/testnode.rs" + [badges] codecov = { repository = "loomprotocol/silk", branch = "master", service = "github" } diff --git a/src/accountant.rs b/src/accountant.rs index 6596e7af2..968ae6d3d 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -2,7 +2,7 @@ //! event log to record transactions. Its users can deposit funds and //! transfer funds to other users. -use log::{verify_entry, Event, PublicKey, Sha256Hash, Signature}; +use log::{Event, PublicKey, Sha256Hash, Signature}; use historian::Historian; use ring::signature::Ed25519KeyPair; use std::sync::mpsc::{RecvError, SendError}; @@ -24,8 +24,9 @@ impl Accountant { } } - pub fn process_event(self: &mut Self, event: Event) { - match event { + pub fn process_event(self: &mut Self, event: &Event) { + println!("accountant: Processing event: {:?}", event); + match *event { Event::Claim { key, data, .. } => { if self.balances.contains_key(&key) { if let Some(x) = self.balances.get_mut(&key) { @@ -52,11 +53,21 @@ impl Accountant { } pub fn sync(self: &mut Self) { + let mut entries = vec![]; while let Ok(entry) = self.historian.receiver.try_recv() { - assert!(verify_entry(&entry, &self.end_hash)); - self.end_hash = entry.end_hash; - - self.process_event(entry.event); + println!("accountant: got event {:?}", entry.event); + entries.push(entry); + } + // TODO: Does this cause the historian's channel to get blocked? + //use log::verify_slice_u64; + //println!("accountant: verifying {} entries...", entries.len()); + //assert!(verify_slice_u64(&entries, &self.end_hash)); + //println!("accountant: Done verifying {} entries.", entries.len()); + if let Some(last_entry) = entries.last() { + self.end_hash = last_entry.end_hash; + } + for e in &entries { + self.process_event(&e.event); } } @@ -88,16 +99,20 @@ impl Accountant { data: u64, sig: Signature, ) -> Result<(), SendError>> { + println!("accountant: Checking funds (needs sync)..."); if self.get_balance(&from).unwrap() < data { // TODO: Replace the SendError result with a custom one. + println!("Error: Insufficient funds"); return Ok(()); } + println!("accountant: Sufficient funds."); let event = Event::Transaction { from, to, data, sig, }; + println!("accountant: Sending Transaction to historian."); self.historian.sender.send(event) } @@ -115,7 +130,9 @@ impl Accountant { } pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result { + println!("accountant: syncing the log..."); self.sync(); + println!("accountant: done syncing."); Ok(*self.balances.get(pubkey).unwrap_or(&0)) } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 008b0e7d4..7d5c39bee 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -35,14 +35,16 @@ impl AccountantSkel { AccountantSkel { obj } } - pub fn process_message(self: &mut Self, msg: Request) -> Option { + pub fn process_request(self: &mut Self, msg: Request) -> Option { match msg { Request::Deposit { key, val, sig } => { let _ = self.obj.deposit_signed(key, val, sig); None } Request::Transfer { from, to, val, sig } => { + println!("skel: Invoking transfer_signed..."); let _ = self.obj.transfer_signed(from, to, val, sig); + println!("skel: transfer_signed done."); None } Request::GetBalance { key } => { @@ -60,17 +62,21 @@ impl AccountantSkel { let listener = TcpListener::bind(addr)?; let mut buf = vec![0u8; 1024]; loop { - let (mut stream, _addr) = listener.accept()?; - - // TODO: Guard against large message DoS attack. + println!("skel: Waiting for incoming connections..."); + let (mut stream, addr) = listener.accept()?; + println!("skel: Accepted incoming connection frm {:?}.", addr); let _sz = stream.read(&mut buf)?; // TODO: Return a descriptive error message if deserialization fails. let req = deserialize(&buf).expect("deserialize request"); + println!("skel: Got request {:?}", req); - if let Some(resp) = self.process_message(req) { + println!("skel: Processing request..."); + if let Some(resp) = self.process_request(req) { + println!("skel: Writing response..."); stream.write(&serialize(&resp).expect("serialize response"))?; } + println!("skel: Done processing request."); } } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 5c5749b1b..10568eee6 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -49,8 +49,13 @@ impl AccountantStub { ) -> io::Result { let req = Request::Transfer { from, to, val, sig }; let data = serialize(&req).unwrap(); + println!("TcpStream::connect()..."); let mut stream = TcpStream::connect(&self.addr)?; - stream.write(&data) + println!("Connected."); + println!("accountant_stub: Writing transfer message..."); + let ret = stream.write(&data); + println!("Done."); + ret } pub fn transfer( diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 209893f4a..fd8b6be2d 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,13 +10,18 @@ fn main() { let mut acc = AccountantStub::new(addr); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); + println!("Depositing..."); acc.deposit(10_000, &alice_keypair).unwrap(); acc.deposit(1_000, &bob_keypair).unwrap(); + println!("Done."); sleep(Duration::from_millis(30)); let bob_pubkey = get_pubkey(&bob_keypair); + println!("Transferring..."); acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + println!("Done."); - sleep(Duration::from_millis(300)); - assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); + sleep(Duration::from_millis(30)); + println!("Done. Checking balance."); + println!("Balance {}", acc.get_balance(&bob_pubkey).unwrap()); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs new file mode 100644 index 000000000..8e4adb9f7 --- /dev/null +++ b/src/bin/testnode.rs @@ -0,0 +1,13 @@ +extern crate silk; + +use silk::accountant_skel::AccountantSkel; +use silk::accountant::Accountant; +use silk::log::Sha256Hash; + +fn main() { + let addr = "127.0.0.1:8000"; + let zero = Sha256Hash::default(); + let acc = Accountant::new(&zero, Some(1000)); + let mut skel = AccountantSkel::new(acc); + skel.serve(addr).unwrap(); +} diff --git a/src/historian.rs b/src/historian.rs index 80df3ba6f..0ed4fcbe7 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -10,6 +10,7 @@ use std::sync::mpsc::{Receiver, Sender}; use std::time::{Duration, SystemTime}; use log::{hash, hash_event, verify_event, Entry, Event, Sha256Hash}; use serde::Serialize; +use std::fmt::Debug; pub struct Historian { pub sender: Sender>, @@ -22,13 +23,14 @@ pub enum ExitReason { RecvDisconnected, SendDisconnected, } -fn log_event( +fn log_event( sender: &Sender>, num_hashes: &mut u64, end_hash: &mut Sha256Hash, event: Event, ) -> Result<(), (Entry, ExitReason)> { *end_hash = hash_event(end_hash, &event); + println!("historian: logging event {:?}", event); let entry = Entry { end_hash: *end_hash, num_hashes: *num_hashes, @@ -41,7 +43,7 @@ fn log_event( Ok(()) } -fn log_events( +fn log_events( receiver: &Receiver>, sender: &Sender>, num_hashes: &mut u64, @@ -82,7 +84,7 @@ fn log_events( /// A background thread that will continue tagging received Event messages and /// sending back Entry messages until either the receiver or sender channel is closed. -pub fn create_logger( +pub fn create_logger( start_hash: Sha256Hash, ms_per_tick: Option, receiver: Receiver>, @@ -112,7 +114,7 @@ pub fn create_logger( }) } -impl Historian { +impl Historian { pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { use std::sync::mpsc::channel; let (sender, event_receiver) = channel(); diff --git a/src/log.rs b/src/log.rs index a0e6b739c..523a12afa 100644 --- a/src/log.rs +++ b/src/log.rs @@ -219,6 +219,14 @@ pub fn verify_slice(events: &[Entry], start_hash: &Sha256Hash) -> bo event_pairs.all(|(x0, x1)| verify_entry(&x1, &x0.end_hash)) } +/// Verifies the hashes and counts of a slice of events are all consistent. +pub fn verify_slice_u64(events: &[Entry], start_hash: &Sha256Hash) -> bool { + use rayon::prelude::*; + let genesis = [Entry::new_tick(Default::default(), start_hash)]; + let event_pairs = genesis.par_iter().chain(events).zip(events); + event_pairs.all(|(x0, x1)| verify_entry(&x1, &x0.end_hash)) +} + /// Verifies the hashes and events serially. Exists only for reference. pub fn verify_slice_seq(events: &[Entry], start_hash: &Sha256Hash) -> bool { let genesis = [Entry::new_tick(0, start_hash)]; From 4610de8fdd6e10e58b10b8a9d4980235829dd87a Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 28 Feb 2018 19:33:28 -0700 Subject: [PATCH 4/4] Switch to sync_channel to preserve order --- Cargo.toml | 2 +- src/accountant.rs | 7 ------- src/accountant_skel.rs | 11 ++--------- src/accountant_stub.rs | 7 +------ src/bin/client-demo.rs | 34 ++++++++++++++++++++++++++-------- src/bin/testnode.rs | 1 + src/historian.rs | 17 ++++++++--------- 7 files changed, 39 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 23f262848..83ac0e595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "silk" description = "A silky smooth implementation of the Loom architecture" -version = "0.2.3" +version = "0.3.0" documentation = "https://docs.rs/silk" homepage = "http://loomprotocol.com/" repository = "https://github.com/loomprotocol/silk" diff --git a/src/accountant.rs b/src/accountant.rs index 968ae6d3d..2892c3973 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -25,7 +25,6 @@ impl Accountant { } pub fn process_event(self: &mut Self, event: &Event) { - println!("accountant: Processing event: {:?}", event); match *event { Event::Claim { key, data, .. } => { if self.balances.contains_key(&key) { @@ -55,7 +54,6 @@ impl Accountant { pub fn sync(self: &mut Self) { let mut entries = vec![]; while let Ok(entry) = self.historian.receiver.try_recv() { - println!("accountant: got event {:?}", entry.event); entries.push(entry); } // TODO: Does this cause the historian's channel to get blocked? @@ -99,20 +97,17 @@ impl Accountant { data: u64, sig: Signature, ) -> Result<(), SendError>> { - println!("accountant: Checking funds (needs sync)..."); if self.get_balance(&from).unwrap() < data { // TODO: Replace the SendError result with a custom one. println!("Error: Insufficient funds"); return Ok(()); } - println!("accountant: Sufficient funds."); let event = Event::Transaction { from, to, data, sig, }; - println!("accountant: Sending Transaction to historian."); self.historian.sender.send(event) } @@ -130,9 +125,7 @@ impl Accountant { } pub fn get_balance(self: &mut Self, pubkey: &PublicKey) -> Result { - println!("accountant: syncing the log..."); self.sync(); - println!("accountant: done syncing."); Ok(*self.balances.get(pubkey).unwrap_or(&0)) } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 7d5c39bee..18419896c 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -42,9 +42,7 @@ impl AccountantSkel { None } Request::Transfer { from, to, val, sig } => { - println!("skel: Invoking transfer_signed..."); let _ = self.obj.transfer_signed(from, to, val, sig); - println!("skel: transfer_signed done."); None } Request::GetBalance { key } => { @@ -62,21 +60,16 @@ impl AccountantSkel { let listener = TcpListener::bind(addr)?; let mut buf = vec![0u8; 1024]; loop { - println!("skel: Waiting for incoming connections..."); - let (mut stream, addr) = listener.accept()?; - println!("skel: Accepted incoming connection frm {:?}.", addr); + //println!("skel: Waiting for incoming connections..."); + let (mut stream, _from_addr) = listener.accept()?; let _sz = stream.read(&mut buf)?; // TODO: Return a descriptive error message if deserialization fails. let req = deserialize(&buf).expect("deserialize request"); - println!("skel: Got request {:?}", req); - println!("skel: Processing request..."); if let Some(resp) = self.process_request(req) { - println!("skel: Writing response..."); stream.write(&serialize(&resp).expect("serialize response"))?; } - println!("skel: Done processing request."); } } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 10568eee6..5c5749b1b 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -49,13 +49,8 @@ impl AccountantStub { ) -> io::Result { let req = Request::Transfer { from, to, val, sig }; let data = serialize(&req).unwrap(); - println!("TcpStream::connect()..."); let mut stream = TcpStream::connect(&self.addr)?; - println!("Connected."); - println!("accountant_stub: Writing transfer message..."); - let ret = stream.write(&data); - println!("Done."); - ret + stream.write(&data) } pub fn transfer( diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index fd8b6be2d..e2ea5e56b 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,18 +10,36 @@ fn main() { let mut acc = AccountantStub::new(addr); let alice_keypair = generate_keypair(); let bob_keypair = generate_keypair(); - println!("Depositing..."); - acc.deposit(10_000, &alice_keypair).unwrap(); - acc.deposit(1_000, &bob_keypair).unwrap(); + let txs = 10_000; + println!("Depositing {} units in Alice's account...", txs); + acc.deposit(txs, &alice_keypair).unwrap(); + //acc.deposit(1_000, &bob_keypair).unwrap(); println!("Done."); sleep(Duration::from_millis(30)); + let alice_pubkey = get_pubkey(&alice_keypair); let bob_pubkey = get_pubkey(&bob_keypair); - println!("Transferring..."); - acc.transfer(500, &alice_keypair, bob_pubkey).unwrap(); + println!("Transferring 1 unit {} times...", txs); + for _ in 0..txs { + acc.transfer(1, &alice_keypair, bob_pubkey).unwrap(); + } println!("Done."); - sleep(Duration::from_millis(30)); - println!("Done. Checking balance."); - println!("Balance {}", acc.get_balance(&bob_pubkey).unwrap()); + sleep(Duration::from_millis(20)); + let mut alice_val = acc.get_balance(&alice_pubkey).unwrap(); + while alice_val > 0 { + println!("Checking on Alice's Balance {}", alice_val); + sleep(Duration::from_millis(20)); + alice_val = acc.get_balance(&alice_pubkey).unwrap(); + } + println!("Done. Checking balances."); + println!( + "Alice's Final Balance {}", + acc.get_balance(&alice_pubkey).unwrap() + ); + + println!( + "Bob's Final Balance {}", + acc.get_balance(&bob_pubkey).unwrap() + ); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 8e4adb9f7..4231eda35 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -9,5 +9,6 @@ fn main() { let zero = Sha256Hash::default(); let acc = Accountant::new(&zero, Some(1000)); let mut skel = AccountantSkel::new(acc); + println!("Listening on {}", addr); skel.serve(addr).unwrap(); } diff --git a/src/historian.rs b/src/historian.rs index 0ed4fcbe7..99e2ff600 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -6,14 +6,14 @@ //! The resulting stream of entries represents ordered events in time. use std::thread::JoinHandle; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, SyncSender}; use std::time::{Duration, SystemTime}; use log::{hash, hash_event, verify_event, Entry, Event, Sha256Hash}; use serde::Serialize; use std::fmt::Debug; pub struct Historian { - pub sender: Sender>, + pub sender: SyncSender>, pub receiver: Receiver>, pub thread_hdl: JoinHandle<(Entry, ExitReason)>, } @@ -24,13 +24,12 @@ pub enum ExitReason { SendDisconnected, } fn log_event( - sender: &Sender>, + sender: &SyncSender>, num_hashes: &mut u64, end_hash: &mut Sha256Hash, event: Event, ) -> Result<(), (Entry, ExitReason)> { *end_hash = hash_event(end_hash, &event); - println!("historian: logging event {:?}", event); let entry = Entry { end_hash: *end_hash, num_hashes: *num_hashes, @@ -45,7 +44,7 @@ fn log_event( fn log_events( receiver: &Receiver>, - sender: &Sender>, + sender: &SyncSender>, num_hashes: &mut u64, end_hash: &mut Sha256Hash, epoch: SystemTime, @@ -88,7 +87,7 @@ pub fn create_logger( start_hash: Sha256Hash, ms_per_tick: Option, receiver: Receiver>, - sender: Sender>, + sender: SyncSender>, ) -> JoinHandle<(Entry, ExitReason)> { use std::thread; thread::spawn(move || { @@ -116,9 +115,9 @@ pub fn create_logger( impl Historian { pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option) -> Self { - use std::sync::mpsc::channel; - let (sender, event_receiver) = channel(); - let (entry_sender, receiver) = channel(); + use std::sync::mpsc::sync_channel; + let (sender, event_receiver) = sync_channel(4000); + let (entry_sender, receiver) = sync_channel(4000); let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { sender,