Rename sender/receiver to input/output

This commit is contained in:
Greg Fitzgerald 2018-05-02 15:54:53 -06:00
parent 48d94143e7
commit c5cc91443e
3 changed files with 29 additions and 29 deletions

View File

@ -105,7 +105,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// Process any Entry items that have been published by the Historian. /// Process any Entry items that have been published by the Historian.
pub fn sync(&mut self) -> Hash { pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.historian.receiver.try_recv() { while let Ok(entry) = self.historian.output.try_recv() {
self.last_id = entry.id; self.last_id = entry.id;
self.acc.register_entry_id(&self.last_id); self.acc.register_entry_id(&self.last_id);
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
@ -215,14 +215,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
for result in self.acc.process_verified_transactions(trs) { for result in self.acc.process_verified_transactions(trs) {
if let Ok(tr) = result { if let Ok(tr) = result {
self.historian self.historian
.sender .input
.send(Signal::Event(Event::Transaction(tr)))?; .send(Signal::Event(Event::Transaction(tr)))?;
} }
} }
// Let validators know they should not attempt to process additional // Let validators know they should not attempt to process additional
// transactions in parallel. // transactions in parallel.
self.historian.sender.send(Signal::Tick)?; self.historian.input.send(Signal::Tick)?;
// Process the remaining requests serially. // Process the remaining requests serially.
let rsps = reqs.into_iter() let rsps = reqs.into_iter()
@ -545,9 +545,9 @@ mod tests {
assert!(skel.process_packets(req_vers).is_ok()); assert!(skel.process_packets(req_vers).is_ok());
// Collect the ledger and feed it to a new accountant. // Collect the ledger and feed it to a new accountant.
skel.historian.sender.send(Signal::Tick).unwrap(); skel.historian.input.send(Signal::Tick).unwrap();
drop(skel.historian.sender); drop(skel.historian.input);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect(); let entries: Vec<Entry> = skel.historian.output.iter().collect();
// Assert the user holds one token, not two. If the server only output one // Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives // entry, then the second transaction will be rejected, because it drives
@ -800,8 +800,8 @@ mod bench {
let tps = txs as f64 / sec; let tps = txs as f64 / sec;
// Ensure that all transactions were successfully logged. // Ensure that all transactions were successfully logged.
drop(skel.historian.sender); drop(skel.historian.input);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect(); let entries: Vec<Entry> = skel.historian.output.iter().collect();
assert_eq!(entries.len(), 1); assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize); assert_eq!(entries[0].events.len(), txs as usize);

View File

@ -17,7 +17,7 @@ fn create_ledger(hist: &Historian, seed: &Hash) -> Result<(), SendError<Signal>>
let keypair = KeyPair::new(); let keypair = KeyPair::new();
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed); let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
let signal0 = Signal::Event(Event::Transaction(tr)); let signal0 = Signal::Event(Event::Transaction(tr));
hist.sender.send(signal0)?; hist.input.send(signal0)?;
sleep(Duration::from_millis(10)); sleep(Duration::from_millis(10));
Ok(()) Ok(())
} }
@ -26,8 +26,8 @@ fn main() {
let seed = Hash::default(); let seed = Hash::default();
let hist = Historian::new(&seed, Some(10)); let hist = Historian::new(&seed, Some(10));
create_ledger(&hist, &seed).expect("send error"); create_ledger(&hist, &seed).expect("send error");
drop(hist.sender); drop(hist.input);
let entries: Vec<Entry> = hist.receiver.iter().collect(); let entries: Vec<Entry> = hist.output.iter().collect();
for entry in &entries { for entry in &entries {
println!("{:?}", entry); println!("{:?}", entry);
} }

View File

@ -9,20 +9,20 @@ use std::thread::{spawn, JoinHandle};
use std::time::Instant; use std::time::Instant;
pub struct Historian { pub struct Historian {
pub sender: SyncSender<Signal>, pub input: SyncSender<Signal>,
pub receiver: Receiver<Entry>, pub output: Receiver<Entry>,
pub thread_hdl: JoinHandle<ExitReason>, pub thread_hdl: JoinHandle<ExitReason>,
} }
impl Historian { impl Historian {
pub fn new(start_hash: &Hash, ms_per_tick: Option<u64>) -> Self { pub fn new(start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (sender, event_receiver) = sync_channel(10_000); let (input, event_receiver) = sync_channel(10_000);
let (entry_sender, receiver) = sync_channel(10_000); let (entry_sender, output) = sync_channel(10_000);
let thread_hdl = let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
Historian { Historian {
sender, input,
receiver, output,
thread_hdl, thread_hdl,
} }
} }
@ -62,21 +62,21 @@ mod tests {
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(&zero, None); let hist = Historian::new(&zero, None);
hist.sender.send(Signal::Tick).unwrap(); hist.input.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
hist.sender.send(Signal::Tick).unwrap(); hist.input.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
hist.sender.send(Signal::Tick).unwrap(); hist.input.send(Signal::Tick).unwrap();
let entry0 = hist.receiver.recv().unwrap(); let entry0 = hist.output.recv().unwrap();
let entry1 = hist.receiver.recv().unwrap(); let entry1 = hist.output.recv().unwrap();
let entry2 = hist.receiver.recv().unwrap(); let entry2 = hist.output.recv().unwrap();
assert_eq!(entry0.num_hashes, 0); assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0); assert_eq!(entry1.num_hashes, 0);
assert_eq!(entry2.num_hashes, 0); assert_eq!(entry2.num_hashes, 0);
drop(hist.sender); drop(hist.input);
assert_eq!( assert_eq!(
hist.thread_hdl.join().unwrap(), hist.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected ExitReason::RecvDisconnected
@ -89,8 +89,8 @@ mod tests {
fn test_historian_closed_sender() { fn test_historian_closed_sender() {
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(&zero, None); let hist = Historian::new(&zero, None);
drop(hist.receiver); drop(hist.output);
hist.sender.send(Signal::Tick).unwrap(); hist.input.send(Signal::Tick).unwrap();
assert_eq!( assert_eq!(
hist.thread_hdl.join().unwrap(), hist.thread_hdl.join().unwrap(),
ExitReason::SendDisconnected ExitReason::SendDisconnected
@ -102,9 +102,9 @@ mod tests {
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(&zero, Some(20)); let hist = Historian::new(&zero, Some(20));
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
hist.sender.send(Signal::Tick).unwrap(); hist.input.send(Signal::Tick).unwrap();
drop(hist.sender); drop(hist.input);
let entries: Vec<Entry> = hist.receiver.iter().collect(); let entries: Vec<Entry> = hist.output.iter().collect();
assert!(entries.len() > 1); assert!(entries.len() > 1);
// Ensure the ID is not the seed. // Ensure the ID is not the seed.