From c9113b381d3a9a6230af8e89586cc6a6426031bf Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 23 May 2018 15:49:59 -0600 Subject: [PATCH] Pull channel functionality into record_stage This makes record_stage consistent with the other stages. The stage manages the channels. Anything else is in a standalone object. In the case of the record_stage, that leaves almost nothing! --- src/banking_stage.rs | 6 +-- src/record_stage.rs | 105 +++++++++++++++++++++++++++++++++---------- src/recorder.rs | 92 +++++-------------------------------- 3 files changed, 95 insertions(+), 108 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 606ee42f47..aa8fe52aae 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -6,7 +6,7 @@ use event::Event; use packet; use packet::SharedPackets; use rayon::prelude::*; -use recorder::Signal; +use record_stage::Signal; use result::Result; use std::net::SocketAddr; use std::sync::Arc; @@ -123,7 +123,7 @@ impl BankingStage { //use event::Event; //use hash::Hash; //use record_stage::RecordStage; -//use recorder::Signal; +//use record_stage::Signal; //use result::Result; //use std::sync::mpsc::{channel, Sender}; //use std::sync::{Arc, Mutex}; @@ -261,7 +261,7 @@ mod bench { use event::Event; use mint::Mint; use packet::{to_packets, PacketRecycler}; - use recorder::Signal; + use record_stage::Signal; use signature::{KeyPair, KeyPairUtil}; use std::iter; use std::sync::Arc; diff --git a/src/record_stage.rs b/src/record_stage.rs index 072983fd09..a2c5a86f7f 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -1,57 +1,97 @@ -//! The `record_stage` implements the Record stage of the TPU. -//! It manages a thread containing a Proof of History Recorder. +//! The `record_stage` module provides an object for generating a Proof of History. +//! It records Event items on behalf of its users. It continuously generates +//! new hashes, only stopping to check if it has been sent an Event item. It +//! tags each Event with an Entry, and sends it back. The Entry includes the +//! Event, the latest hash, and the number of hashes since the last event. +//! The resulting stream of entries represents ordered events in time. use entry::Entry; +use event::Event; use hash::Hash; -use recorder::{ExitReason, Recorder, Signal}; +use recorder::Recorder; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread::{spawn, JoinHandle}; use std::time::{Duration, Instant}; +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] +pub enum Signal { + Tick, + Events(Vec), +} + +#[derive(Debug, PartialEq, Eq)] +pub enum ExitReason { + RecvDisconnected, + SendDisconnected, +} + pub struct RecordStage { pub entry_receiver: Receiver, pub thread_hdl: JoinHandle, } impl RecordStage { + /// A background thread that will continue tagging received Event messages and + /// sending back Entry messages until either the receiver or sender channel is closed. pub fn new( event_receiver: Receiver, start_hash: &Hash, tick_duration: Option, ) -> Self { let (entry_sender, entry_receiver) = channel(); - let thread_hdl = - Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender); - RecordStage { - entry_receiver, - thread_hdl, - } - } + let start_hash = start_hash.clone(); - /// A background thread that will continue tagging received Event messages and - /// sending back Entry messages until either the receiver or sender channel is closed. - fn create_recorder( - start_hash: Hash, - tick_duration: Option, - receiver: Receiver, - sender: Sender, - ) -> JoinHandle { - spawn(move || { - let mut recorder = Recorder::new(receiver, sender, start_hash); + let thread_hdl = spawn(move || { + let mut recorder = Recorder::new(start_hash); let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); loop { - if let Err(err) = recorder.process_events(duration_data) { + if let Err(err) = Self::process_events( + &mut recorder, + duration_data, + &event_receiver, + &entry_sender, + ) { return err; } if duration_data.is_some() { recorder.hash(); } } - }) + }); + + RecordStage { + entry_receiver, + thread_hdl, + } } - pub fn receive(self: &Self) -> Result { - self.entry_receiver.try_recv() + pub fn process_events( + recorder: &mut Recorder, + duration_data: Option<(Instant, Duration)>, + receiver: &Receiver, + sender: &Sender, + ) -> Result<(), ExitReason> { + loop { + if let Some((start_time, tick_duration)) = duration_data { + if let Some(entry) = recorder.tick(start_time, tick_duration) { + sender.send(entry).or(Err(ExitReason::SendDisconnected))?; + } + } + match receiver.try_recv() { + Ok(signal) => match signal { + Signal::Tick => { + let entry = recorder.record(vec![]); + sender.send(entry).or(Err(ExitReason::SendDisconnected))?; + } + Signal::Events(events) => { + let entry = recorder.record(events); + sender.send(entry).or(Err(ExitReason::SendDisconnected))?; + } + }, + Err(TryRecvError::Empty) => return Ok(()), + Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected), + }; + } } } @@ -59,6 +99,8 @@ impl RecordStage { mod tests { use super::*; use ledger::Block; + use signature::{KeyPair, KeyPairUtil}; + use std::sync::mpsc::channel; use std::thread::sleep; #[test] @@ -103,6 +145,21 @@ mod tests { ); } + #[test] + fn test_events() { + let (input, signal_receiver) = channel(); + let zero = Hash::default(); + let record_stage = RecordStage::new(signal_receiver, &zero, None); + let alice_keypair = KeyPair::new(); + let bob_pubkey = KeyPair::new().pubkey(); + let event0 = Event::new_transaction(&alice_keypair, bob_pubkey, 1, zero); + let event1 = Event::new_transaction(&alice_keypair, bob_pubkey, 2, zero); + input.send(Signal::Events(vec![event0, event1])).unwrap(); + drop(input); + let entries: Vec<_> = record_stage.entry_receiver.iter().collect(); + assert_eq!(entries.len(), 1); + } + #[test] #[ignore] fn test_ticking_historian() { diff --git a/src/recorder.rs b/src/recorder.rs index 09e0d471e7..790c7ee77a 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -1,41 +1,20 @@ //! The `recorder` module provides an object for generating a Proof of History. -//! It records Event items on behalf of its users. It continuously generates -//! new hashes, only stopping to check if it has been sent an Event item. It -//! tags each Event with an Entry, and sends it back. The Entry includes the -//! Event, the latest hash, and the number of hashes since the last event. -//! The resulting stream of entries represents ordered events in time. +//! It records Event items on behalf of its users. use entry::Entry; use event::Event; use hash::{hash, Hash}; -use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::{Duration, Instant}; -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -pub enum Signal { - Tick, - Events(Vec), -} - -#[derive(Debug, PartialEq, Eq)] -pub enum ExitReason { - RecvDisconnected, - SendDisconnected, -} - pub struct Recorder { - sender: Sender, - receiver: Receiver, last_hash: Hash, num_hashes: u64, num_ticks: u32, } impl Recorder { - pub fn new(receiver: Receiver, sender: Sender, last_hash: Hash) -> Self { + pub fn new(last_hash: Hash) -> Self { Recorder { - receiver, - sender, last_hash, num_hashes: 0, num_ticks: 0, @@ -47,66 +26,17 @@ impl Recorder { self.num_hashes += 1; } - pub fn record_entry(&mut self, events: Vec) -> Result<(), ExitReason> { - let entry = Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, events); - self.sender - .send(entry) - .or(Err(ExitReason::SendDisconnected))?; - Ok(()) + pub fn record(&mut self, events: Vec) -> Entry { + Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, events) } - pub fn process_events( - &mut self, - duration_data: Option<(Instant, Duration)>, - ) -> Result<(), ExitReason> { - loop { - if let Some((start_time, tick_duration)) = duration_data { - if start_time.elapsed() > tick_duration * (self.num_ticks + 1) { - self.record_entry(vec![])?; - // TODO: don't let this overflow u32 - self.num_ticks += 1; - } - } - - match self.receiver.try_recv() { - Ok(signal) => match signal { - Signal::Tick => { - self.record_entry(vec![])?; - } - Signal::Events(events) => { - self.record_entry(events)?; - } - }, - Err(TryRecvError::Empty) => return Ok(()), - Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected), - }; + pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option { + if start_time.elapsed() > tick_duration * (self.num_ticks + 1) { + // TODO: don't let this overflow u32 + self.num_ticks += 1; + Some(self.record(vec![])) + } else { + None } } } - -#[cfg(test)] -mod tests { - use super::*; - use signature::{KeyPair, KeyPairUtil}; - use std::sync::mpsc::channel; - - #[test] - fn test_events() { - let (signal_sender, signal_receiver) = channel(); - let (entry_sender, entry_receiver) = channel(); - let zero = Hash::default(); - let mut recorder = Recorder::new(signal_receiver, entry_sender, zero); - let alice_keypair = KeyPair::new(); - let bob_pubkey = KeyPair::new().pubkey(); - let event0 = Event::new_transaction(&alice_keypair, bob_pubkey, 1, zero); - let event1 = Event::new_transaction(&alice_keypair, bob_pubkey, 2, zero); - signal_sender - .send(Signal::Events(vec![event0, event1])) - .unwrap(); - recorder.process_events(None).unwrap(); - - drop(recorder.sender); - let entries: Vec<_> = entry_receiver.iter().collect(); - assert_eq!(entries.len(), 1); - } -}