Sleep between events if PoH is disabled

This commit is contained in:
Greg Fitzgerald 2018-05-30 15:37:12 -06:00
parent a8e1c44663
commit 7aab7d2f82
2 changed files with 76 additions and 34 deletions

View File

@ -8,7 +8,7 @@
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use recorder::Recorder; use recorder::Recorder;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use transaction::Transaction; use transaction::Transaction;
@ -27,10 +27,29 @@ pub struct RecordStage {
impl RecordStage { impl RecordStage {
/// A background thread that will continue tagging received Event messages and /// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed. /// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new( pub fn new(signal_receiver: Receiver<Signal>, start_hash: &Hash) -> Self {
transaction_receiver: Receiver<Signal>, let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone();
let thread_hdl = Builder::new()
.name("solana-record-stage".to_string())
.spawn(move || {
let mut recorder = Recorder::new(start_hash);
let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender);
})
.unwrap();
RecordStage {
entry_receiver,
thread_hdl,
}
}
/// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`.
pub fn new_with_clock(
signal_receiver: Receiver<Signal>,
start_hash: &Hash, start_hash: &Hash,
tick_duration: Option<Duration>, tick_duration: Duration,
) -> Self { ) -> Self {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone(); let start_hash = start_hash.clone();
@ -39,19 +58,18 @@ impl RecordStage {
.name("solana-record-stage".to_string()) .name("solana-record-stage".to_string())
.spawn(move || { .spawn(move || {
let mut recorder = Recorder::new(start_hash); let mut recorder = Recorder::new(start_hash);
let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); let start_time = Instant::now();
loop { loop {
if let Err(_) = Self::process_transactions( if let Err(_) = Self::try_process_signals(
&mut recorder, &mut recorder,
duration_data, start_time,
&transaction_receiver, tick_duration,
&signal_receiver,
&entry_sender, &entry_sender,
) { ) {
return; return;
} }
if duration_data.is_some() { recorder.hash();
recorder.hash();
}
} }
}) })
.unwrap(); .unwrap();
@ -62,29 +80,46 @@ impl RecordStage {
} }
} }
pub fn process_transactions( fn process_signal(
signal: Signal,
recorder: &mut Recorder,
sender: &Sender<Entry>,
) -> Result<(), ()> {
let txs = if let Signal::Events(txs) = signal {
txs
} else {
vec![]
};
let entry = recorder.record(txs);
sender.send(entry).map_err(|_| ())
}
fn process_signals(
recorder: &mut Recorder, recorder: &mut Recorder,
duration_data: Option<(Instant, Duration)>,
receiver: &Receiver<Signal>, receiver: &Receiver<Signal>,
sender: &Sender<Entry>, sender: &Sender<Entry>,
) -> Result<(), ()> { ) -> Result<(), ()> {
loop { loop {
if let Some((start_time, tick_duration)) = duration_data { match receiver.recv() {
if let Some(entry) = recorder.tick(start_time, tick_duration) { Ok(signal) => Self::process_signal(signal, recorder, sender)?,
sender.send(entry).or(Err(()))?; Err(RecvError) => return Err(()),
} }
}
}
fn try_process_signals(
recorder: &mut Recorder,
start_time: Instant,
tick_duration: Duration,
receiver: &Receiver<Signal>,
sender: &Sender<Entry>,
) -> Result<(), ()> {
loop {
if let Some(entry) = recorder.tick(start_time, tick_duration) {
sender.send(entry).or(Err(()))?;
} }
match receiver.try_recv() { match receiver.try_recv() {
Ok(signal) => match signal { Ok(signal) => Self::process_signal(signal, recorder, sender)?,
Signal::Tick => {
let entry = recorder.record(vec![]);
sender.send(entry).or(Err(()))?;
}
Signal::Events(transactions) => {
let entry = recorder.record(transactions);
sender.send(entry).or(Err(()))?;
}
},
Err(TryRecvError::Empty) => return Ok(()), Err(TryRecvError::Empty) => return Ok(()),
Err(TryRecvError::Disconnected) => return Err(()), Err(TryRecvError::Disconnected) => return Err(()),
}; };
@ -104,7 +139,7 @@ mod tests {
fn test_historian() { fn test_historian() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None); let record_stage = RecordStage::new(tx_receiver, &zero);
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
@ -130,7 +165,7 @@ mod tests {
fn test_historian_closed_sender() { fn test_historian_closed_sender() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, None); let record_stage = RecordStage::new(tx_receiver, &zero);
drop(record_stage.entry_receiver); drop(record_stage.entry_receiver);
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
@ -140,7 +175,7 @@ mod tests {
fn test_transactions() { fn test_transactions() {
let (tx_sender, signal_receiver) = channel(); let (tx_sender, signal_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(signal_receiver, &zero, None); let record_stage = RecordStage::new(signal_receiver, &zero);
let alice_keypair = KeyPair::new(); let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
@ -153,10 +188,11 @@ mod tests {
#[test] #[test]
#[ignore] #[ignore]
fn test_ticking_historian() { fn test_clock() {
let (tx_sender, tx_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20))); let record_stage =
RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20));
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
tx_sender.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
drop(tx_sender); drop(tx_sender);

View File

@ -46,8 +46,14 @@ impl Tpu {
packet_recycler.clone(), packet_recycler.clone(),
); );
let record_stage = let record_stage = match tick_duration {
RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration); Some(tick_duration) => RecordStage::new_with_clock(
banking_stage.signal_receiver,
&start_hash,
tick_duration,
),
None => RecordStage::new(banking_stage.signal_receiver, &start_hash),
};
let write_stage = WriteStage::new( let write_stage = WriteStage::new(
bank.clone(), bank.clone(),