diff --git a/src/accountant.rs b/src/accountant.rs index 8c2fb048c..df2addeef 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -15,7 +15,7 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet}; use std::result; -use std::sync::mpsc::SendError; +use std::sync::mpsc::{Receiver, SendError}; use transaction::Transaction; #[derive(Debug, PartialEq, Eq)] @@ -36,9 +36,8 @@ fn complete_transaction(balances: &mut HashMap, plan: &Plan) { } pub struct Accountant { - pub historian: Historian, - pub balances: HashMap, - pub first_id: Hash, + historian: Historian, + balances: HashMap, pending: HashMap, time_sources: HashSet, last_time: DateTime, @@ -46,7 +45,7 @@ pub struct Accountant { impl Accountant { /// Create an Accountant using an existing ledger. - pub fn new_from_entries(entries: I, ms_per_tick: Option) -> Self + pub fn new_from_entries(entries: I, ms_per_tick: Option) -> (Self, Hash) where I: IntoIterator, { @@ -61,7 +60,6 @@ impl Accountant { let mut acc = Accountant { historian: hist, balances: HashMap::new(), - first_id: start_hash, pending: HashMap::new(), time_sources: HashSet::new(), last_time: Utc.timestamp(0, 0), @@ -73,17 +71,19 @@ impl Accountant { let entry1 = entries.next().unwrap(); acc.process_verified_event(&entry1.events[0], true).unwrap(); + let mut last_id = entry1.id; for entry in entries { + last_id = entry.id; for event in entry.events { acc.process_verified_event(&event, false).unwrap(); } } - acc + (acc, last_id) } /// Create an Accountant with only a Mint. Typically used by unit tests. pub fn new(mint: &Mint, ms_per_tick: Option) -> Self { - Self::new_from_entries(mint.create_entries(), ms_per_tick) + Self::new_from_entries(mint.create_entries(), ms_per_tick).0 } fn is_deposit(allow_deposits: bool, from: &PublicKey, plan: &Plan) -> bool { @@ -94,6 +94,10 @@ impl Accountant { } } + pub fn receiver(&self) -> &Receiver { + &self.historian.receiver + } + /// Process and log the given Transaction. pub fn log_verified_transaction(&mut self, tr: Transaction) -> Result<()> { if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens { diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b8b1226eb..26af07399 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,8 +22,8 @@ use transaction::Transaction; use rayon::prelude::*; pub struct AccountantSkel { - pub acc: Accountant, - pub last_id: Hash, + acc: Accountant, + last_id: Hash, writer: W, } @@ -32,7 +32,7 @@ pub struct AccountantSkel { pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, - GetId { is_last: bool }, + GetLastId, } impl Request { @@ -54,23 +54,22 @@ fn filter_valid_requests(reqs: Vec<(Request, SocketAddr)>) -> Vec<(Request, Sock pub enum Response { Balance { key: PublicKey, val: Option }, Entries { entries: Vec }, - Id { id: Hash, is_last: bool }, + LastId { id: Hash }, } impl AccountantSkel { /// Create a new AccountantSkel that wraps the given Accountant. - pub fn new(acc: Accountant, w: W) -> Self { - let last_id = acc.first_id; + pub fn new(acc: Accountant, last_id: Hash, writer: W) -> Self { AccountantSkel { acc, last_id, - writer: w, + writer, } } /// Process any Entry items that have been published by the Historian. pub fn sync(&mut self) -> Hash { - while let Ok(entry) = self.acc.historian.receiver.try_recv() { + while let Ok(entry) = self.acc.receiver().try_recv() { self.last_id = entry.id; writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); } @@ -90,14 +89,7 @@ impl AccountantSkel { let val = self.acc.get_balance(&key); Some(Response::Balance { key, val }) } - Request::GetId { is_last } => Some(Response::Id { - id: if is_last { - self.sync() - } else { - self.acc.first_id - }, - is_last, - }), + Request::GetLastId => Some(Response::LastId { id: self.sync() }), } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 1c3c36d2f..518c08faf 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -65,26 +65,21 @@ impl AccountantStub { Ok(None) } - /// Request the first or last Entry ID from the server. - fn get_id(&self, is_last: bool) -> io::Result { - let req = Request::GetId { is_last }; - let data = serialize(&req).expect("serialize GetId"); - self.socket.send_to(&data, &self.addr)?; - let mut buf = vec![0u8; 1024]; - self.socket.recv_from(&mut buf)?; - let resp = deserialize(&buf).expect("deserialize Id"); - if let Response::Id { id, .. } = resp { - return Ok(id); - } - Ok(Default::default()) - } - /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. At the time of this writing, /// it also has the side-effect of causing the server to log any /// entries that have been published by the Historian. pub fn get_last_id(&self) -> io::Result { - self.get_id(true) + let req = Request::GetLastId; + let data = serialize(&req).expect("serialize GetId"); + self.socket.send_to(&data, &self.addr)?; + let mut buf = vec![0u8; 1024]; + self.socket.recv_from(&mut buf)?; + let resp = deserialize(&buf).expect("deserialize Id"); + if let Response::LastId { id } = resp { + return Ok(id); + } + Ok(Default::default()) } } @@ -110,7 +105,7 @@ mod tests { let acc = Accountant::new(&alice, Some(30)); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, sink()))); + let acc = Arc::new(Mutex::new(AccountantSkel::new(acc, alice.seed(), sink()))); let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); sleep(Duration::from_millis(300)); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index a8daaec4c..1e562b74f 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -14,9 +14,9 @@ fn main() { .lock() .lines() .map(|line| serde_json::from_str(&line.unwrap()).unwrap()); - let acc = Accountant::new_from_entries(entries, Some(1000)); + let (acc, last_id) = Accountant::new_from_entries(entries, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let skel = Arc::new(Mutex::new(AccountantSkel::new(acc, stdout()))); + let skel = Arc::new(Mutex::new(AccountantSkel::new(acc, last_id, stdout()))); eprintln!("Listening on {}", addr); let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); for t in threads { diff --git a/src/recorder.rs b/src/recorder.rs index 40536458c..faf075935 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -34,11 +34,11 @@ pub struct Recorder { } impl Recorder { - pub fn new(receiver: Receiver, sender: SyncSender, start_hash: Hash) -> Self { + pub fn new(receiver: Receiver, sender: SyncSender, last_hash: Hash) -> Self { Recorder { receiver, sender, - last_hash: start_hash, + last_hash, events: vec![], num_hashes: 0, num_ticks: 0,