diff --git a/src/record_stage.rs b/src/record_stage.rs index f8523856b..6767247d5 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -8,7 +8,7 @@ use entry::Entry; use hash::Hash; use recorder::Recorder; -use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; use transaction::Transaction; @@ -27,10 +27,29 @@ pub struct RecordStage { impl RecordStage { /// A background thread that will continue tagging received Event messages and /// sending back Entry messages until either the receiver or sender channel is closed. - pub fn new( - transaction_receiver: Receiver, + pub fn new(signal_receiver: Receiver, start_hash: &Hash) -> Self { + let (entry_sender, entry_receiver) = channel(); + let start_hash = start_hash.clone(); + + let thread_hdl = Builder::new() + .name("solana-record-stage".to_string()) + .spawn(move || { + let mut recorder = Recorder::new(start_hash); + let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender); + }) + .unwrap(); + + RecordStage { + entry_receiver, + thread_hdl, + } + } + + /// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`. + pub fn new_with_clock( + signal_receiver: Receiver, start_hash: &Hash, - tick_duration: Option, + tick_duration: Duration, ) -> Self { let (entry_sender, entry_receiver) = channel(); let start_hash = start_hash.clone(); @@ -39,19 +58,18 @@ impl RecordStage { .name("solana-record-stage".to_string()) .spawn(move || { let mut recorder = Recorder::new(start_hash); - let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); + let start_time = Instant::now(); loop { - if let Err(_) = Self::process_transactions( + if let Err(_) = Self::try_process_signals( &mut recorder, - duration_data, - &transaction_receiver, + start_time, + tick_duration, + &signal_receiver, &entry_sender, ) { return; } - if duration_data.is_some() { - recorder.hash(); - } + recorder.hash(); } }) .unwrap(); @@ -62,29 +80,46 @@ impl RecordStage { } } - pub fn process_transactions( + fn process_signal( + signal: Signal, + recorder: &mut Recorder, + sender: &Sender, + ) -> Result<(), ()> { + let txs = if let Signal::Events(txs) = signal { + txs + } else { + vec![] + }; + let entry = recorder.record(txs); + sender.send(entry).map_err(|_| ()) + } + + fn process_signals( recorder: &mut Recorder, - duration_data: Option<(Instant, Duration)>, receiver: &Receiver, sender: &Sender, ) -> Result<(), ()> { loop { - if let Some((start_time, tick_duration)) = duration_data { - if let Some(entry) = recorder.tick(start_time, tick_duration) { - sender.send(entry).or(Err(()))?; - } + match receiver.recv() { + Ok(signal) => Self::process_signal(signal, recorder, sender)?, + Err(RecvError) => return Err(()), + } + } + } + + fn try_process_signals( + recorder: &mut Recorder, + start_time: Instant, + tick_duration: Duration, + receiver: &Receiver, + sender: &Sender, + ) -> Result<(), ()> { + loop { + if let Some(entry) = recorder.tick(start_time, tick_duration) { + sender.send(entry).or(Err(()))?; } match receiver.try_recv() { - Ok(signal) => match signal { - Signal::Tick => { - let entry = recorder.record(vec![]); - sender.send(entry).or(Err(()))?; - } - Signal::Events(transactions) => { - let entry = recorder.record(transactions); - sender.send(entry).or(Err(()))?; - } - }, + Ok(signal) => Self::process_signal(signal, recorder, sender)?, Err(TryRecvError::Empty) => return Ok(()), Err(TryRecvError::Disconnected) => return Err(()), }; @@ -104,7 +139,7 @@ mod tests { fn test_historian() { let (tx_sender, tx_receiver) = channel(); let zero = Hash::default(); - let record_stage = RecordStage::new(tx_receiver, &zero, None); + let record_stage = RecordStage::new(tx_receiver, &zero); tx_sender.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); @@ -130,7 +165,7 @@ mod tests { fn test_historian_closed_sender() { let (tx_sender, tx_receiver) = channel(); let zero = Hash::default(); - let record_stage = RecordStage::new(tx_receiver, &zero, None); + let record_stage = RecordStage::new(tx_receiver, &zero); drop(record_stage.entry_receiver); tx_sender.send(Signal::Tick).unwrap(); assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); @@ -140,7 +175,7 @@ mod tests { fn test_transactions() { let (tx_sender, signal_receiver) = channel(); let zero = Hash::default(); - let record_stage = RecordStage::new(signal_receiver, &zero, None); + let record_stage = RecordStage::new(signal_receiver, &zero); let alice_keypair = KeyPair::new(); let bob_pubkey = KeyPair::new().pubkey(); let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); @@ -153,10 +188,11 @@ mod tests { #[test] #[ignore] - fn test_ticking_historian() { + fn test_clock() { let (tx_sender, tx_receiver) = channel(); let zero = Hash::default(); - let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20))); + let record_stage = + RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20)); sleep(Duration::from_millis(900)); tx_sender.send(Signal::Tick).unwrap(); drop(tx_sender); diff --git a/src/tpu.rs b/src/tpu.rs index 9e2e42c32..094de1f11 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -46,8 +46,14 @@ impl Tpu { packet_recycler.clone(), ); - let record_stage = - RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration); + let record_stage = match tick_duration { + Some(tick_duration) => RecordStage::new_with_clock( + banking_stage.signal_receiver, + &start_hash, + tick_duration, + ), + None => RecordStage::new(banking_stage.signal_receiver, &start_hash), + }; let write_stage = WriteStage::new( bank.clone(),