diff --git a/src/accounting_stage.rs b/src/accounting_stage.rs index 6837c6ad96..74bea95efc 100644 --- a/src/accounting_stage.rs +++ b/src/accounting_stage.rs @@ -5,36 +5,52 @@ use bincode::serialize; use entry::Entry; use event::Event; use hash::Hash; +use historian::Historian; use recorder::Signal; use result::Result; use signature::PublicKey; use std::net::{SocketAddr, UdpSocket}; -use std::sync::mpsc::Sender; -use std::sync::Mutex; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; use transaction::Transaction; pub struct AccountingStage { + pub output: Arc>>, + entry_sender: Arc>>, pub acc: Mutex, historian_input: Mutex>, + historian: Mutex, entry_info_subscribers: Mutex>, } impl AccountingStage { /// Create a new Tpu that wraps the given Accountant. - pub fn new(acc: Accountant, historian_input: Sender) -> Self { + pub fn new(acc: Accountant, historian_input: Sender, historian: Historian) -> Self { + let (entry_sender, output) = channel(); AccountingStage { + output: Arc::new(Mutex::new(output)), + entry_sender: Arc::new(Mutex::new(entry_sender)), acc: Mutex::new(acc), entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), + historian: Mutex::new(historian), } } /// Process the transactions in parallel and then log the successful ones. pub fn process_events(&self, events: Vec) -> Result<()> { - let results = self.acc.lock().unwrap().process_verified_events(events); + let acc = self.acc.lock().unwrap(); + let historian = self.historian.lock().unwrap(); + let results = acc.process_verified_events(events); let events = results.into_iter().filter_map(|x| x.ok()).collect(); let sender = self.historian_input.lock().unwrap(); sender.send(Signal::Events(events))?; + + // Wait for the historian to tag our Events with an ID and then register it. + let entry = historian.output.lock().unwrap().recv()?; + acc.register_entry_id(&entry.id); + self.entry_sender.lock().unwrap().send(entry)?; + debug!("after historian_input"); Ok(()) } @@ -156,7 +172,7 @@ mod tests { let acc = Accountant::new(&mint); let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input); + let stage = AccountingStage::new(acc, input, historian); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); @@ -170,8 +186,8 @@ mod tests { assert!(stage.process_events(events).is_ok()); // Collect the ledger and feed it to a new accountant. - drop(stage.historian_input); - let entries: Vec = historian.output.lock().unwrap().iter().collect(); + drop(stage.entry_sender); + let entries: Vec = stage.output.lock().unwrap().iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -247,7 +263,7 @@ mod bench { let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input); + let stage = AccountingStage::new(acc, input, historian); let now = Instant::now(); assert!(stage.process_events(events).is_ok()); @@ -257,7 +273,7 @@ mod bench { // Ensure that all transactions were successfully logged. drop(stage.historian_input); - let entries: Vec = historian.output.lock().unwrap().iter().collect(); + let entries: Vec = stage.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize); diff --git a/src/tpu.rs b/src/tpu.rs index 579a9359f2..406e3b8913 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -32,7 +32,6 @@ use timing; pub struct Tpu { accounting: AccountingStage, - historian: Historian, } type SharedTpu = Arc; @@ -40,11 +39,8 @@ type SharedTpu = Arc; impl Tpu { /// Create a new Tpu that wraps the given Accountant. pub fn new(acc: Accountant, historian_input: Sender, historian: Historian) -> Self { - let accounting = AccountingStage::new(acc, historian_input); - Tpu { - accounting, - historian, - } + let accounting = AccountingStage::new(acc, historian_input, historian); + Tpu { accounting } } fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { @@ -65,14 +61,14 @@ impl Tpu { fn receive_all(obj: &SharedTpu, writer: &Arc>) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = obj.historian + let entry = obj.accounting .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; Self::update_entry(obj, writer, &entry); l.push(entry); - while let Ok(entry) = obj.historian.receive() { + while let Ok(entry) = obj.accounting.output.lock().unwrap().try_recv() { Self::update_entry(obj, writer, &entry); l.push(entry); }