From 17cc9ab07f9594e0a16289734afa6e401553ae14 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 14:19:11 -0600 Subject: [PATCH] Rename Historian to RecordStage Historian was a legacy name. The new name reflects the new pipelined architecture. --- src/event_processor.rs | 12 ++++++------ src/lib.rs | 2 +- src/{historian.rs => record_stage.rs} | 28 +++++++++++++-------------- src/rpu.rs | 6 +++--- src/tvu.rs | 6 +++--- 5 files changed, 27 insertions(+), 27 deletions(-) rename src/{historian.rs => record_stage.rs} (79%) diff --git a/src/event_processor.rs b/src/event_processor.rs index 4f30def1f..3ce63d27b 100644 --- a/src/event_processor.rs +++ b/src/event_processor.rs @@ -4,7 +4,7 @@ use accountant::Accountant; use entry::Entry; use event::Event; use hash::Hash; -use historian::Historian; +use record_stage::RecordStage; use recorder::Signal; use result::Result; use std::sync::mpsc::{channel, Sender}; @@ -14,7 +14,7 @@ use std::time::Duration; pub struct EventProcessor { pub accountant: Arc, historian_input: Mutex>, - historian: Mutex, + record_stage: Mutex, pub start_hash: Hash, pub tick_duration: Option, } @@ -23,11 +23,11 @@ impl EventProcessor { /// Create a new stage of the TPU for event and transaction processing pub fn new(accountant: Accountant, start_hash: &Hash, tick_duration: Option) -> Self { let (historian_input, event_receiver) = channel(); - let historian = Historian::new(event_receiver, start_hash, tick_duration); + let record_stage = RecordStage::new(event_receiver, start_hash, tick_duration); EventProcessor { accountant: Arc::new(accountant), historian_input: Mutex::new(historian_input), - historian: Mutex::new(historian), + record_stage: Mutex::new(record_stage), start_hash: *start_hash, tick_duration, } @@ -35,14 +35,14 @@ impl EventProcessor { /// Process the transactions in parallel and then log the successful ones. pub fn process_events(&self, events: Vec) -> Result { - let historian = self.historian.lock().unwrap(); + let record_stage = self.record_stage.lock().unwrap(); let results = self.accountant.process_verified_events(events); let events = results.into_iter().filter_map(|x| x.ok()).collect(); let sender = self.historian_input.lock().unwrap(); sender.send(Signal::Events(events))?; // Wait for the historian to tag our Events with an ID and then register it. - let entry = historian.entry_receiver.recv()?; + let entry = record_stage.entry_receiver.recv()?; self.accountant.register_entry_id(&entry.id); Ok(entry) } diff --git a/src/lib.rs b/src/lib.rs index 6d50c18e3..7099ef43d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,12 +9,12 @@ pub mod erasure; pub mod event; pub mod event_processor; pub mod hash; -pub mod historian; pub mod ledger; pub mod logger; pub mod mint; pub mod packet; pub mod plan; +pub mod record_stage; pub mod recorder; pub mod request; pub mod request_processor; diff --git a/src/historian.rs b/src/record_stage.rs similarity index 79% rename from src/historian.rs rename to src/record_stage.rs index 553d3ff6e..072983fd0 100644 --- a/src/historian.rs +++ b/src/record_stage.rs @@ -1,4 +1,4 @@ -//! The `historian` module provides a microservice for generating a Proof of History. +//! The `record_stage` implements the Record stage of the TPU. //! It manages a thread containing a Proof of History Recorder. use entry::Entry; @@ -8,12 +8,12 @@ use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread::{spawn, JoinHandle}; use std::time::{Duration, Instant}; -pub struct Historian { +pub struct RecordStage { pub entry_receiver: Receiver, pub thread_hdl: JoinHandle, } -impl Historian { +impl RecordStage { pub fn new( event_receiver: Receiver, start_hash: &Hash, @@ -22,7 +22,7 @@ impl Historian { let (entry_sender, entry_receiver) = channel(); let thread_hdl = Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender); - Historian { + RecordStage { entry_receiver, thread_hdl, } @@ -65,7 +65,7 @@ mod tests { fn test_historian() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, None); + let record_stage = RecordStage::new(event_receiver, &zero, None); input.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); @@ -73,9 +73,9 @@ mod tests { sleep(Duration::new(0, 1_000_000)); input.send(Signal::Tick).unwrap(); - let entry0 = hist.entry_receiver.recv().unwrap(); - let entry1 = hist.entry_receiver.recv().unwrap(); - let entry2 = hist.entry_receiver.recv().unwrap(); + let entry0 = record_stage.entry_receiver.recv().unwrap(); + let entry1 = record_stage.entry_receiver.recv().unwrap(); + let entry2 = record_stage.entry_receiver.recv().unwrap(); assert_eq!(entry0.num_hashes, 0); assert_eq!(entry1.num_hashes, 0); @@ -83,7 +83,7 @@ mod tests { drop(input); assert_eq!( - hist.thread_hdl.join().unwrap(), + record_stage.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected ); @@ -94,11 +94,11 @@ mod tests { fn test_historian_closed_sender() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, None); - drop(hist.entry_receiver); + let record_stage = RecordStage::new(event_receiver, &zero, None); + drop(record_stage.entry_receiver); input.send(Signal::Tick).unwrap(); assert_eq!( - hist.thread_hdl.join().unwrap(), + record_stage.thread_hdl.join().unwrap(), ExitReason::SendDisconnected ); } @@ -108,11 +108,11 @@ mod tests { fn test_ticking_historian() { let (input, event_receiver) = channel(); let zero = Hash::default(); - let hist = Historian::new(event_receiver, &zero, Some(Duration::from_millis(20))); + let record_stage = RecordStage::new(event_receiver, &zero, Some(Duration::from_millis(20))); sleep(Duration::from_millis(900)); input.send(Signal::Tick).unwrap(); drop(input); - let entries: Vec = hist.entry_receiver.iter().collect(); + let entries: Vec = record_stage.entry_receiver.iter().collect(); assert!(entries.len() > 1); // Ensure the ID is not the seed. diff --git a/src/rpu.rs b/src/rpu.rs index 0690d6069..8019f7d0e 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -6,8 +6,8 @@ use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; -use historian::Historian; use packet; +use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; use result::Result; @@ -95,7 +95,7 @@ impl Rpu { blob_recycler.clone(), ); - let historian_stage = Historian::new( + let record_stage = RecordStage::new( request_stage.signal_receiver, &self.event_processor.start_hash, self.event_processor.tick_duration, @@ -108,7 +108,7 @@ impl Rpu { broadcast_sender, blob_recycler.clone(), Mutex::new(writer), - historian_stage.entry_receiver, + record_stage.entry_receiver, ); let broadcast_socket = UdpSocket::bind(local)?; diff --git a/src/tvu.rs b/src/tvu.rs index f05e9c2dc..4e33bb77f 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -6,9 +6,9 @@ use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; -use historian::Historian; use ledger; use packet; +use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; use result::Result; @@ -180,7 +180,7 @@ impl Tvu { blob_recycler.clone(), ); - let historian_stage = Historian::new( + let record_stage = RecordStage::new( request_stage.signal_receiver, &obj.event_processor.start_hash, obj.event_processor.tick_duration, @@ -189,7 +189,7 @@ impl Tvu { let t_write = Self::drain_service( obj.event_processor.accountant.clone(), exit.clone(), - historian_stage.entry_receiver, + record_stage.entry_receiver, ); let t_responder = streamer::responder(