From 98ae80f4ed9a11816961c4c70834393c0d609f6b Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 9 May 2018 09:26:58 -0600 Subject: [PATCH] Hoist historian --- src/tpu.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index f97916ea3e..d52088adde 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -35,18 +35,16 @@ use transaction::Transaction; struct AccountingStage { acc: Mutex, historian_input: Mutex>, - historian: Historian, entry_info_subscribers: Mutex>, } impl AccountingStage { /// Create a new Tpu that wraps the given Accountant. - pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { + pub fn new(acc: Accountant, historian_input: SyncSender) -> Self { AccountingStage { acc: Mutex::new(acc), entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), - historian, } } @@ -122,6 +120,7 @@ impl AccountingStage { pub struct Tpu { accounting: AccountingStage, + historian: Historian, } #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] @@ -165,8 +164,11 @@ pub enum Response { impl Tpu { /// Create a new Tpu that wraps the given Accountant. pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { - let accounting = AccountingStage::new(acc, historian_input, historian); - Tpu { accounting } + let accounting = AccountingStage::new(acc, historian_input); + Tpu { + accounting, + historian, + } } fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { @@ -187,15 +189,14 @@ impl Tpu { fn receive_all(obj: &SharedTpu, writer: &Arc>) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = obj.accounting - .historian + let entry = obj.historian .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; Self::update_entry(obj, writer, &entry); l.push(entry); - while let Ok(entry) = obj.accounting.historian.receive() { + while let Ok(entry) = obj.historian.receive() { Self::update_entry(obj, writer, &entry); l.push(entry); } @@ -865,7 +866,7 @@ mod tests { let acc = Accountant::new(&mint); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input, historian); + let stage = AccountingStage::new(acc, input); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); @@ -880,7 +881,7 @@ mod tests { // Collect the ledger and feed it to a new accountant. drop(stage.historian_input); - let entries: Vec = stage.historian.output.lock().unwrap().iter().collect(); + let entries: Vec = historian.output.lock().unwrap().iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -1179,7 +1180,7 @@ mod bench { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let stage = AccountingStage::new(acc, input, historian); + let stage = AccountingStage::new(acc, input); let now = Instant::now(); assert!(stage.process_events(req_vers).is_ok()); @@ -1189,7 +1190,7 @@ mod bench { // Ensure that all transactions were successfully logged. drop(stage.historian_input); - let entries: Vec = stage.historian.output.lock().unwrap().iter().collect(); + let entries: Vec = historian.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize);