Pull channel functionality into record_stage

This makes record_stage consistent with the other stages. The stage
manages the channels. Anything else is in a standalone object. In
the case of the record_stage, that leaves almost nothing!
This commit is contained in:
Greg Fitzgerald 2018-05-23 15:49:59 -06:00
parent 75e69eecfa
commit c9113b381d
3 changed files with 95 additions and 108 deletions

View File

@ -6,7 +6,7 @@ use event::Event;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use recorder::Signal;
use record_stage::Signal;
use result::Result;
use std::net::SocketAddr;
use std::sync::Arc;
@ -123,7 +123,7 @@ impl BankingStage {
//use event::Event;
//use hash::Hash;
//use record_stage::RecordStage;
//use recorder::Signal;
//use record_stage::Signal;
//use result::Result;
//use std::sync::mpsc::{channel, Sender};
//use std::sync::{Arc, Mutex};
@ -261,7 +261,7 @@ mod bench {
use event::Event;
use mint::Mint;
use packet::{to_packets, PacketRecycler};
use recorder::Signal;
use record_stage::Signal;
use signature::{KeyPair, KeyPairUtil};
use std::iter;
use std::sync::Arc;

View File

@ -1,57 +1,97 @@
//! The `record_stage` implements the Record stage of the TPU.
//! It manages a thread containing a Proof of History Recorder.
//! The `record_stage` module provides an object for generating a Proof of History.
//! It records Event items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Event item. It
//! tags each Event with an Entry, and sends it back. The Entry includes the
//! Event, the latest hash, and the number of hashes since the last event.
//! The resulting stream of entries represents ordered events in time.
use entry::Entry;
use event::Event;
use hash::Hash;
use recorder::{ExitReason, Recorder, Signal};
use recorder::Recorder;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::{spawn, JoinHandle};
use std::time::{Duration, Instant};
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Signal {
Tick,
Events(Vec<Event>),
}
#[derive(Debug, PartialEq, Eq)]
pub enum ExitReason {
RecvDisconnected,
SendDisconnected,
}
pub struct RecordStage {
pub entry_receiver: Receiver<Entry>,
pub thread_hdl: JoinHandle<ExitReason>,
}
impl RecordStage {
/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new(
event_receiver: Receiver<Signal>,
start_hash: &Hash,
tick_duration: Option<Duration>,
) -> Self {
let (entry_sender, entry_receiver) = channel();
let thread_hdl =
Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender);
RecordStage {
entry_receiver,
thread_hdl,
}
}
let start_hash = start_hash.clone();
/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
fn create_recorder(
start_hash: Hash,
tick_duration: Option<Duration>,
receiver: Receiver<Signal>,
sender: Sender<Entry>,
) -> JoinHandle<ExitReason> {
spawn(move || {
let mut recorder = Recorder::new(receiver, sender, start_hash);
let thread_hdl = spawn(move || {
let mut recorder = Recorder::new(start_hash);
let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
loop {
if let Err(err) = recorder.process_events(duration_data) {
if let Err(err) = Self::process_events(
&mut recorder,
duration_data,
&event_receiver,
&entry_sender,
) {
return err;
}
if duration_data.is_some() {
recorder.hash();
}
}
})
});
RecordStage {
entry_receiver,
thread_hdl,
}
}
pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
self.entry_receiver.try_recv()
pub fn process_events(
recorder: &mut Recorder,
duration_data: Option<(Instant, Duration)>,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ExitReason> {
loop {
if let Some((start_time, tick_duration)) = duration_data {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(ExitReason::SendDisconnected))?;
}
}
match receiver.try_recv() {
Ok(signal) => match signal {
Signal::Tick => {
let entry = recorder.record(vec![]);
sender.send(entry).or(Err(ExitReason::SendDisconnected))?;
}
Signal::Events(events) => {
let entry = recorder.record(events);
sender.send(entry).or(Err(ExitReason::SendDisconnected))?;
}
},
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected),
};
}
}
}
@ -59,6 +99,8 @@ impl RecordStage {
mod tests {
use super::*;
use ledger::Block;
use signature::{KeyPair, KeyPairUtil};
use std::sync::mpsc::channel;
use std::thread::sleep;
#[test]
@ -103,6 +145,21 @@ mod tests {
);
}
#[test]
fn test_events() {
let (input, signal_receiver) = channel();
let zero = Hash::default();
let record_stage = RecordStage::new(signal_receiver, &zero, None);
let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey();
let event0 = Event::new_transaction(&alice_keypair, bob_pubkey, 1, zero);
let event1 = Event::new_transaction(&alice_keypair, bob_pubkey, 2, zero);
input.send(Signal::Events(vec![event0, event1])).unwrap();
drop(input);
let entries: Vec<_> = record_stage.entry_receiver.iter().collect();
assert_eq!(entries.len(), 1);
}
#[test]
#[ignore]
fn test_ticking_historian() {

View File

@ -1,41 +1,20 @@
//! The `recorder` module provides an object for generating a Proof of History.
//! It records Event items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Event item. It
//! tags each Event with an Entry, and sends it back. The Entry includes the
//! Event, the latest hash, and the number of hashes since the last event.
//! The resulting stream of entries represents ordered events in time.
//! It records Event items on behalf of its users.
use entry::Entry;
use event::Event;
use hash::{hash, Hash};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::{Duration, Instant};
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Signal {
Tick,
Events(Vec<Event>),
}
#[derive(Debug, PartialEq, Eq)]
pub enum ExitReason {
RecvDisconnected,
SendDisconnected,
}
pub struct Recorder {
sender: Sender<Entry>,
receiver: Receiver<Signal>,
last_hash: Hash,
num_hashes: u64,
num_ticks: u32,
}
impl Recorder {
pub fn new(receiver: Receiver<Signal>, sender: Sender<Entry>, last_hash: Hash) -> Self {
pub fn new(last_hash: Hash) -> Self {
Recorder {
receiver,
sender,
last_hash,
num_hashes: 0,
num_ticks: 0,
@ -47,66 +26,17 @@ impl Recorder {
self.num_hashes += 1;
}
pub fn record_entry(&mut self, events: Vec<Event>) -> Result<(), ExitReason> {
let entry = Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, events);
self.sender
.send(entry)
.or(Err(ExitReason::SendDisconnected))?;
Ok(())
pub fn record(&mut self, events: Vec<Event>) -> Entry {
Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, events)
}
pub fn process_events(
&mut self,
duration_data: Option<(Instant, Duration)>,
) -> Result<(), ExitReason> {
loop {
if let Some((start_time, tick_duration)) = duration_data {
if start_time.elapsed() > tick_duration * (self.num_ticks + 1) {
self.record_entry(vec![])?;
// TODO: don't let this overflow u32
self.num_ticks += 1;
}
}
match self.receiver.try_recv() {
Ok(signal) => match signal {
Signal::Tick => {
self.record_entry(vec![])?;
}
Signal::Events(events) => {
self.record_entry(events)?;
}
},
Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(ExitReason::RecvDisconnected),
};
pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option<Entry> {
if start_time.elapsed() > tick_duration * (self.num_ticks + 1) {
// TODO: don't let this overflow u32
self.num_ticks += 1;
Some(self.record(vec![]))
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use signature::{KeyPair, KeyPairUtil};
use std::sync::mpsc::channel;
#[test]
fn test_events() {
let (signal_sender, signal_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let zero = Hash::default();
let mut recorder = Recorder::new(signal_receiver, entry_sender, zero);
let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey();
let event0 = Event::new_transaction(&alice_keypair, bob_pubkey, 1, zero);
let event1 = Event::new_transaction(&alice_keypair, bob_pubkey, 2, zero);
signal_sender
.send(Signal::Events(vec![event0, event1]))
.unwrap();
recorder.process_events(None).unwrap();
drop(recorder.sender);
let entries: Vec<_> = entry_receiver.iter().collect();
assert_eq!(entries.len(), 1);
}
}