From 2b788d06b744c2fcb69732b4c23d722623ad00aa Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 2 Apr 2018 14:41:07 -0600 Subject: [PATCH] Move the historian up to accountant_skel --- src/accountant.rs | 88 ++++++++++-------------------------------- src/accountant_skel.rs | 18 +++++++-- src/accountant_stub.rs | 11 +++++- src/bin/testnode.rs | 11 +++++- 4 files changed, 52 insertions(+), 76 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index d459dc332..561b317c7 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -7,15 +7,12 @@ use chrono::prelude::*; use entry::Entry; use event::Event; use hash::Hash; -use historian::Historian; use mint::Mint; use plan::{Payment, Plan, Witness}; -use recorder::Signal; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet}; use std::result; -use std::sync::mpsc::{Receiver, SendError}; use transaction::Transaction; #[derive(Debug, PartialEq, Eq)] @@ -23,7 +20,6 @@ pub enum AccountingError { InsufficientFunds, InvalidTransfer, InvalidTransferSignature, - SendError, } pub type Result = result::Result; @@ -34,7 +30,6 @@ fn apply_payment(balances: &mut HashMap, payment: &Payment) { } pub struct Accountant { - historian: Historian, balances: HashMap, pending: HashMap, signatures: HashSet, @@ -44,16 +39,10 @@ pub struct Accountant { impl Accountant { /// Create an Accountant using a deposit. - pub fn new_from_deposit( - start_hash: &Hash, - deposit: &Payment, - ms_per_tick: Option, - ) -> Self { + pub fn new_from_deposit(deposit: &Payment) -> Self { let mut balances = HashMap::new(); apply_payment(&mut balances, &deposit); - let historian = Historian::new(&start_hash, ms_per_tick); Accountant { - historian, balances, pending: HashMap::new(), signatures: HashSet::new(), @@ -63,16 +52,16 @@ impl Accountant { } /// Create an Accountant with only a Mint. Typically used by unit tests. - pub fn new(mint: &Mint, ms_per_tick: Option) -> Self { + pub fn new(mint: &Mint) -> Self { let deposit = Payment { to: mint.pubkey(), tokens: mint.tokens, }; - Self::new_from_deposit(&mint.seed(), &deposit, ms_per_tick) + Self::new_from_deposit(&deposit) } /// Create an Accountant using an existing ledger. - pub fn new_from_entries(entries: I, ms_per_tick: Option) -> (Self, Hash) + pub fn new_from_entries(entries: I) -> (Self, Hash) where I: IntoIterator, { @@ -80,8 +69,7 @@ impl Accountant { // The first item in the ledger is required to be an entry with zero num_hashes, // which implies its id can be used as the ledger's seed. - let entry0 = entries.next().unwrap(); - let start_hash = entry0.id; + entries.next().unwrap(); // The second item in the ledger is a special transaction where the to and from // fields are the same. That entry should be treated as a deposit, not a @@ -93,7 +81,7 @@ impl Accountant { None }; - let mut acc = Self::new_from_deposit(&start_hash, &deposit.unwrap(), ms_per_tick); + let mut acc = Self::new_from_deposit(&deposit.unwrap()); let mut last_id = entry1.id; for entry in entries { @@ -105,30 +93,13 @@ impl Accountant { (acc, last_id) } - pub fn receiver(&self) -> &Receiver { - &self.historian.receiver - } - - /// Process and log the given Transaction. - pub fn log_verified_transaction(&mut self, tr: Transaction) -> Result<()> { - self.process_verified_transaction(&tr)?; - if let Err(SendError(_)) = self.historian - .sender - .send(Signal::Event(Event::Transaction(tr))) - { - return Err(AccountingError::SendError); - } - - Ok(()) - } - /// Verify and process the given Transaction. - pub fn log_transaction(&mut self, tr: Transaction) -> Result<()> { + pub fn process_transaction(&mut self, tr: Transaction) -> Result<()> { if !tr.verify() { return Err(AccountingError::InvalidTransfer); } - self.log_verified_transaction(tr) + self.process_verified_transaction(&tr) } fn reserve_signature(&mut self, sig: &Signature) -> bool { @@ -231,7 +202,7 @@ impl Accountant { ) -> Result { let tr = Transaction::new(keypair, to, n, last_id); let sig = tr.sig; - self.log_transaction(tr).map(|_| sig) + self.process_transaction(tr).map(|_| sig) } /// Create, sign, and process a postdated Transaction from `keypair` @@ -247,7 +218,7 @@ impl Accountant { ) -> Result { let tr = Transaction::new_on_date(keypair, to, dt, n, last_id); let sig = tr.sig; - self.log_transaction(tr).map(|_| sig) + self.process_transaction(tr).map(|_| sig) } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { @@ -258,14 +229,13 @@ impl Accountant { #[cfg(test)] mod tests { use super::*; - use recorder::ExitReason; use signature::KeyPairUtil; #[test] fn test_accountant() { let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); - let mut acc = Accountant::new(&alice, Some(2)); + let mut acc = Accountant::new(&alice); acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed()) .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); @@ -273,18 +243,12 @@ mod tests { acc.transfer(500, &alice.keypair(), bob_pubkey, alice.seed()) .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); - - drop(acc.historian.sender); - assert_eq!( - acc.historian.thread_hdl.join().unwrap(), - ExitReason::RecvDisconnected - ); } #[test] fn test_invalid_transfer() { let alice = Mint::new(11_000); - let mut acc = Accountant::new(&alice, Some(2)); + let mut acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.seed()) .unwrap(); @@ -296,25 +260,19 @@ mod tests { let alice_pubkey = alice.keypair().pubkey(); assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); - - drop(acc.historian.sender); - assert_eq!( - acc.historian.thread_hdl.join().unwrap(), - ExitReason::RecvDisconnected - ); } #[test] fn test_overspend_attack() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice, None); + let mut acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let mut tr = Transaction::new(&alice.keypair(), bob_pubkey, 1, alice.seed()); if let Plan::Pay(ref mut payment) = tr.plan { payment.tokens = 2; // <-- attack! } assert_eq!( - acc.log_transaction(tr.clone()), + acc.process_transaction(tr.clone()), Err(AccountingError::InvalidTransfer) ); @@ -323,7 +281,7 @@ mod tests { payment.tokens = 0; // <-- whoops! } assert_eq!( - acc.log_transaction(tr.clone()), + acc.process_transaction(tr.clone()), Err(AccountingError::InvalidTransfer) ); } @@ -331,24 +289,18 @@ mod tests { #[test] fn test_transfer_to_newb() { let alice = Mint::new(10_000); - let mut acc = Accountant::new(&alice, Some(2)); + let mut acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); acc.transfer(500, &alice_keypair, bob_pubkey, alice.seed()) .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); - - drop(acc.historian.sender); - assert_eq!( - acc.historian.thread_hdl.join().unwrap(), - ExitReason::RecvDisconnected - ); } #[test] fn test_transfer_on_date() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice, Some(2)); + let mut acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); @@ -374,7 +326,7 @@ mod tests { #[test] fn test_transfer_after_date() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice, Some(2)); + let mut acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); @@ -391,7 +343,7 @@ mod tests { #[test] fn test_cancel_transfer() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice, Some(2)); + let mut acc = Accountant::new(&alice); let alice_keypair = alice.keypair(); let bob_pubkey = KeyPair::new().pubkey(); let dt = Utc::now(); @@ -417,7 +369,7 @@ mod tests { #[test] fn test_duplicate_event_signature() { let alice = Mint::new(1); - let mut acc = Accountant::new(&alice, None); + let mut acc = Accountant::new(&alice); let sig = Signature::default(); assert!(acc.reserve_signature(&sig)); assert!(!acc.reserve_signature(&sig)); diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 26af07399..8893fcb1f 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -3,8 +3,11 @@ //! in flux. Clients should use AccountantStub to interact with it. use accountant::Accountant; +use historian::Historian; +use recorder::Signal; use bincode::{deserialize, serialize}; use entry::Entry; +use event::Event; use hash::Hash; use result::Result; use serde_json; @@ -13,7 +16,7 @@ use std::default::Default; use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, SendError}; use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -25,6 +28,7 @@ pub struct AccountantSkel { acc: Accountant, last_id: Hash, writer: W, + historian: Historian, } #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] @@ -59,17 +63,18 @@ pub enum Response { impl AccountantSkel { /// Create a new AccountantSkel that wraps the given Accountant. - pub fn new(acc: Accountant, last_id: Hash, writer: W) -> Self { + pub fn new(acc: Accountant, last_id: Hash, writer: W, historian: Historian) -> Self { AccountantSkel { acc, last_id, writer, + historian, } } /// Process any Entry items that have been published by the Historian. pub fn sync(&mut self) -> Hash { - while let Ok(entry) = self.acc.receiver().try_recv() { + while let Ok(entry) = self.historian.receiver.try_recv() { self.last_id = entry.id; writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); } @@ -80,8 +85,13 @@ impl AccountantSkel { pub fn log_verified_request(&mut self, msg: Request) -> Option { match msg { Request::Transaction(tr) => { - if let Err(err) = self.acc.log_verified_transaction(tr) { + if let Err(err) = self.acc.process_verified_transaction(&tr) { eprintln!("Transaction error: {:?}", err); + } else if let Err(SendError(_)) = self.historian + .sender + .send(Signal::Event(Event::Transaction(tr))) + { + eprintln!("Channel send error"); } None } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 518c08faf..cd8b1c87a 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -87,6 +87,7 @@ impl AccountantStub { mod tests { use super::*; use accountant::Accountant; + use historian::Historian; use accountant_skel::AccountantSkel; use mint::Mint; use signature::{KeyPair, KeyPairUtil}; @@ -102,10 +103,16 @@ mod tests { let addr = "127.0.0.1:9000"; let send_addr = "127.0.0.1:9001"; let alice = Mint::new(10_000); - let acc = Accountant::new(&alice, Some(30)); + let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, alice.seed(), sink()))); + let historian = Historian::new(&alice.seed(), Some(30)); + let acc = Arc::new(Mutex::new(AccountantSkel::new( + acc, + alice.seed(), + sink(), + historian, + ))); let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); sleep(Duration::from_millis(300)); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 1e562b74f..346da9f70 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -2,6 +2,7 @@ extern crate serde_json; extern crate solana; use solana::accountant::Accountant; +use solana::historian::Historian; use solana::accountant_skel::AccountantSkel; use std::io::{self, stdout, BufRead}; use std::sync::atomic::AtomicBool; @@ -14,9 +15,15 @@ fn main() { .lock() .lines() .map(|line| serde_json::from_str(&line.unwrap()).unwrap()); - let (acc, last_id) = Accountant::new_from_entries(entries, Some(1000)); + let (acc, last_id) = Accountant::new_from_entries(entries); + let historian = Historian::new(&last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let skel = Arc::new(Mutex::new(AccountantSkel::new(acc, last_id, stdout()))); + let skel = Arc::new(Mutex::new(AccountantSkel::new( + acc, + last_id, + stdout(), + historian, + ))); eprintln!("Listening on {}", addr); let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); for t in threads {