diff --git a/src/record_stage.rs b/src/record_stage.rs index ab0dca8d1a..7aeb2fe0b3 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -5,14 +5,15 @@ //! Transaction, the latest hash, and the number of hashes since the last transaction. //! The resulting stream of entries represents ordered transactions in time. +use bank::Bank; use counter::Counter; use entry::Entry; -use hash::Hash; use log::Level; use recorder::Recorder; use service::Service; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; +use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use transaction::Transaction; @@ -30,18 +31,15 @@ pub struct RecordStage { impl RecordStage { /// A background thread that will continue tagging received Transaction messages and /// sending back Entry messages until either the receiver or sender channel is closed. - pub fn new( - signal_receiver: Receiver, - start_hash: &Hash, - ) -> (Self, Receiver>) { + pub fn new(signal_receiver: Receiver, bank: Arc) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = *start_hash; + let start_hash = bank.last_id(); let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) .spawn(move || { let mut recorder = Recorder::new(start_hash); - let _ = Self::process_signals(&mut recorder, &signal_receiver, &entry_sender); + let _ = Self::process_signals(&mut recorder, &signal_receiver, bank, &entry_sender); }).unwrap(); (RecordStage { thread_hdl }, entry_receiver) @@ -50,11 +48,11 @@ impl RecordStage { /// Same as `RecordStage::new`, but will automatically produce entries every `tick_duration`. pub fn new_with_clock( signal_receiver: Receiver, - start_hash: &Hash, + bank: Arc, tick_duration: Duration, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = *start_hash; + let start_hash = bank.last_id(); let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) @@ -67,6 +65,7 @@ impl RecordStage { start_time, tick_duration, &signal_receiver, + bank.clone(), &entry_sender, ).is_err() { @@ -81,6 +80,7 @@ impl RecordStage { fn process_signal( signal: Signal, + bank: &Arc, recorder: &mut Recorder, sender: &Sender>, ) -> Result<(), ()> { @@ -91,6 +91,13 @@ impl RecordStage { }; let txs_len = txs.len(); let entries = recorder.record(txs); + + for entry in entries.iter() { + if !entry.has_more { + bank.register_entry_id(&entry.id); + } + } + let entries_len = entries.len(); sender.send(entries).or(Err(()))?; @@ -103,11 +110,12 @@ impl RecordStage { fn process_signals( recorder: &mut Recorder, receiver: &Receiver, + bank: Arc, sender: &Sender>, ) -> Result<(), ()> { loop { match receiver.recv() { - Ok(signal) => Self::process_signal(signal, recorder, sender)?, + Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?, Err(RecvError) => return Err(()), } } @@ -118,6 +126,7 @@ impl RecordStage { start_time: Instant, tick_duration: Duration, receiver: &Receiver, + bank: Arc, sender: &Sender>, ) -> Result<(), ()> { loop { @@ -125,7 +134,7 @@ impl RecordStage { sender.send(vec![entry]).or(Err(()))?; } match receiver.try_recv() { - Ok(signal) => Self::process_signal(signal, recorder, sender)?, + Ok(signal) => Self::process_signal(signal, &bank, recorder, sender)?, Err(TryRecvError::Empty) => return Ok(()), Err(TryRecvError::Disconnected) => return Err(()), }; @@ -144,16 +153,21 @@ impl Service for RecordStage { #[cfg(test)] mod tests { use super::*; + use bank::Bank; use ledger::Block; + use mint::Mint; use signature::{Keypair, KeypairUtil}; use std::sync::mpsc::channel; + use std::sync::Arc; use std::thread::sleep; #[test] fn test_historian() { let (tx_sender, tx_receiver) = channel(); - let zero = Hash::default(); - let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); + let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); tx_sender.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); @@ -178,8 +192,10 @@ mod tests { #[test] fn test_historian_closed_sender() { let (tx_sender, tx_receiver) = channel(); - let zero = Hash::default(); - let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, &zero); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); + let (record_stage, entry_receiver) = RecordStage::new(tx_receiver, bank); drop(entry_receiver); tx_sender.send(Signal::Tick).unwrap(); assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); @@ -188,8 +204,10 @@ mod tests { #[test] fn test_transactions() { let (tx_sender, signal_receiver) = channel(); - let zero = Hash::default(); - let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, &zero); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); + let (_record_stage, entry_receiver) = RecordStage::new(signal_receiver, bank); let alice_keypair = Keypair::new(); let bob_pubkey = Keypair::new().pubkey(); let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); @@ -205,9 +223,11 @@ mod tests { #[test] fn test_clock() { let (tx_sender, tx_receiver) = channel(); - let zero = Hash::default(); + let mint = Mint::new(1234); + let bank = Arc::new(Bank::new(&mint)); + let zero = bank.last_id(); let (_record_stage, entry_receiver) = - RecordStage::new_with_clock(tx_receiver, &zero, Duration::from_millis(20)); + RecordStage::new_with_clock(tx_receiver, bank, Duration::from_millis(20)); sleep(Duration::from_millis(900)); tx_sender.send(Signal::Tick).unwrap(); drop(tx_sender); diff --git a/src/tpu.rs b/src/tpu.rs index 9b9bc7c15f..8846600202 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -76,9 +76,9 @@ impl Tpu { let (record_stage, entry_receiver) = match tick_duration { Some(tick_duration) => { - RecordStage::new_with_clock(signal_receiver, &bank.last_id(), tick_duration) + RecordStage::new_with_clock(signal_receiver, bank.clone(), tick_duration) } - None => RecordStage::new(signal_receiver, &bank.last_id()), + None => RecordStage::new(signal_receiver, bank.clone()), }; let (write_stage, blob_receiver) = WriteStage::new( diff --git a/src/write_stage.rs b/src/write_stage.rs index f5af5a9f4e..cc7beb6daa 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -30,7 +30,6 @@ impl WriteStage { /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( crdt: &Arc>, - bank: &Arc, ledger_writer: &mut LedgerWriter, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, @@ -43,12 +42,6 @@ impl WriteStage { ledger_writer.write_entries(entries.clone())?; - for entry in &entries { - if !entry.has_more { - bank.register_entry_id(&entry.id); - } - } - inc_new_counter_info!("write_stage-write_entries", entries.len()); //TODO(anatoly): real stake based voting needs to change this @@ -96,7 +89,6 @@ impl WriteStage { loop { if let Err(e) = Self::write_and_send_entries( &crdt, - &bank, &mut ledger_writer, &blob_sender, &blob_recycler,