diff --git a/src/bank.rs b/src/bank.rs index a7b6fb3b33..db2383a00f 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -20,6 +20,7 @@ use mint::Mint; use native_loader; use payment_plan::Payment; use poh_recorder::PohRecorder; +use poh_service::NUM_TICKS_PER_SECOND; use rayon::prelude::*; use rpc::RpcSignatureStatus; use signature::Keypair; @@ -49,7 +50,6 @@ use window::WINDOW_SIZE; /// but requires clients to update its `last_id` more frequently. Raising the value /// lengthens the time a client must wait to be certain a missing transaction will /// not be processed by the network. -pub const NUM_TICKS_PER_SECOND: usize = 10; pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120; pub const VERIFY_BLOCK_SIZE: usize = 16; diff --git a/src/banking_stage.rs b/src/banking_stage.rs index daf7bf6e93..ed6cc92b37 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -2,7 +2,7 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use bank::{Bank, NUM_TICKS_PER_SECOND}; +use bank::Bank; use bincode::deserialize; use counter::Counter; use entry::Entry; @@ -10,15 +10,15 @@ use hash::Hash; use log::Level; use packet::Packets; use poh_recorder::{PohRecorder, PohRecorderError}; +use poh_service::{Config, PohService}; use rayon::prelude::*; use result::{Error, Result}; use service::Service; use sigverify_stage::VerifiedPackets; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; -use std::thread::sleep; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; @@ -38,23 +38,9 @@ pub const NUM_THREADS: usize = 10; pub struct BankingStage { /// Handle to the stage's thread. bank_thread_hdls: Vec>>, - tick_producer: JoinHandle>, + poh_service: PohService, } -pub enum Config { - /// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before transmitting a new entry. - Tick(usize), - /// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 poh once and producing 1 - /// tick. - Sleep(Duration), -} - -impl Default for Config { - fn default() -> Config { - // TODO: Change this to Tick to enable PoH - Config::Sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND as u64)) - } -} impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. pub fn new( @@ -67,49 +53,28 @@ impl BankingStage { ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); - let poh = PohRecorder::new( + let poh_recorder = PohRecorder::new( bank.clone(), entry_sender, *last_entry_id, tick_height, max_tick_height, + false, + vec![], ); - let tick_poh = poh.clone(); - // Tick producer is a headless producer, so when it exits it should notify the banking stage. - // Since channel are not used to talk between these threads an AtomicBool is used as a - // signal. - let poh_exit = Arc::new(AtomicBool::new(false)); - let banking_exit = poh_exit.clone(); + // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. - let tick_producer = Builder::new() - .name("solana-banking-stage-tick_producer".to_string()) - .spawn(move || { - let mut tick_poh_ = tick_poh; - let return_value = match Self::tick_producer(&mut tick_poh_, &config, &poh_exit) { - Err(Error::SendError) => Some(BankingStageReturnType::ChannelDisconnected), - Err(e) => { - error!( - "solana-banking-stage-tick_producer unexpected error {:?}", - e - ); - None - } - Ok(x) => x, - }; - debug!("tick producer exiting"); - poh_exit.store(true, Ordering::Relaxed); - return_value - }).unwrap(); + let poh_service = PohService::new(poh_recorder.clone(), config); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec>> = (0..NUM_THREADS) .map(|_| { let thread_bank = bank.clone(); let thread_verified_receiver = shared_verified_receiver.clone(); - let thread_poh = poh.clone(); - let thread_banking_exit = banking_exit.clone(); + let thread_poh_recorder = poh_recorder.clone(); + let thread_banking_exit = poh_service.poh_exit.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -117,7 +82,7 @@ impl BankingStage { if let Err(e) = Self::process_packets( &thread_bank, &thread_verified_receiver, - &thread_poh, + &thread_poh_recorder, ) { debug!("got error {:?}", e); match e { @@ -138,7 +103,6 @@ impl BankingStage { } } if thread_banking_exit.load(Ordering::Relaxed) { - debug!("tick service exited"); break None; } }; @@ -150,7 +114,7 @@ impl BankingStage { ( BankingStage { bank_thread_hdls, - tick_producer, + poh_service, }, entry_receiver, ) @@ -168,47 +132,6 @@ impl BankingStage { }).collect() } - fn tick_producer( - poh: &mut PohRecorder, - config: &Config, - poh_exit: &AtomicBool, - ) -> Result> { - loop { - match *config { - Config::Tick(num) => { - for _ in 0..num { - match poh.hash() { - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { - return Ok(Some(BankingStageReturnType::LeaderRotation)); - } - Err(e) => { - return Err(e); - } - _ => (), - } - } - } - Config::Sleep(duration) => { - sleep(duration); - } - } - match poh.tick() { - Ok(height) if Some(height) == poh.max_tick_height => { - // CASE 1: We were successful in recording the last tick, so exit - return Ok(Some(BankingStageReturnType::LeaderRotation)); - } - Ok(_) => (), - Err(e) => { - return Err(e); - } - }; - if poh_exit.load(Ordering::Relaxed) { - debug!("tick service exited"); - return Ok(None); - } - } - } - fn process_transactions( bank: &Arc, transactions: &[Transaction], @@ -306,9 +229,16 @@ impl Service for BankingStage { } } - let tick_return_value = self.tick_producer.join()?; - if tick_return_value.is_some() { - return_value = tick_return_value; + let poh_return_value = self.poh_service.join()?; + match poh_return_value { + Ok(_) => (), + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { + return_value = Some(BankingStageReturnType::LeaderRotation); + } + Err(Error::SendError) => { + return_value = Some(BankingStageReturnType::ChannelDisconnected); + } + Err(_) => (), } Ok(return_value) diff --git a/src/lib.rs b/src/lib.rs index 6f3f0f1802..5a4b20c238 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ pub mod packet; pub mod payment_plan; pub mod poh; pub mod poh_recorder; +pub mod poh_service; pub mod recvmmsg; pub mod replicate_stage; pub mod replicator; diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index c525759bda..96acf66e1f 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -12,11 +12,14 @@ use transaction::Transaction; #[derive(Debug, PartialEq, Eq, Clone)] pub enum PohRecorderError { + InvalidCallingObject, MaxHeightReached, } #[derive(Clone)] pub struct PohRecorder { + is_virtual: bool, + virtual_tick_entries: Arc>>, poh: Arc>, bank: Arc, sender: Sender>, @@ -27,6 +30,51 @@ pub struct PohRecorder { } impl PohRecorder { + pub fn hash(&self) -> Result<()> { + // TODO: amortize the cost of this lock by doing the loop in here for + // some min amount of hashes + let mut poh = self.poh.lock().unwrap(); + if self.is_max_tick_height_reached(&*poh) { + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + } else { + poh.hash(); + Ok(()) + } + } + + pub fn tick(&mut self) -> Result<()> { + // Register and send the entry out while holding the lock if the max PoH height + // hasn't been reached. + // This guarantees PoH order and Entry production and banks LastId queue is the same + let mut poh = self.poh.lock().unwrap(); + if self.is_max_tick_height_reached(&*poh) { + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + } else if self.is_virtual { + self.generate_and_store_tick(&mut *poh); + Ok(()) + } else { + self.register_and_send_tick(&mut *poh)?; + Ok(()) + } + } + + pub fn record(&self, mixin: Hash, txs: Vec) -> Result<()> { + if self.is_virtual { + return Err(Error::PohRecorderError( + PohRecorderError::InvalidCallingObject, + )); + } + // Register and send the entry out while holding the lock. + // This guarantees PoH order and Entry production and banks LastId queue is the same. + let mut poh = self.poh.lock().unwrap(); + if self.is_max_tick_height_reached(&*poh) { + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + } else { + self.record_and_send_txs(&mut *poh, mixin, txs)?; + Ok(()) + } + } + /// A recorder to synchronize PoH with the following data structures /// * bank - the LastId's queue is updated on `tick` and `record` events /// * sender - the Entry channel that outputs to the ledger @@ -36,54 +84,31 @@ impl PohRecorder { last_entry_id: Hash, tick_height: u64, max_tick_height: Option, + is_virtual: bool, + virtual_tick_entries: Vec, ) -> Self { let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height))); + let virtual_tick_entries = Arc::new(Mutex::new(virtual_tick_entries)); PohRecorder { poh, bank, sender, max_tick_height, + is_virtual, + virtual_tick_entries, } } - pub fn hash(&self) -> Result<()> { - // TODO: amortize the cost of this lock by doing the loop in here for - // some min amount of hashes - let mut poh = self.poh.lock().unwrap(); - if self.check_max_tick_height_reached(&*poh) { - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - } else { - poh.hash(); - Ok(()) + fn generate_tick_entry(&self, poh: &mut Poh) -> Entry { + let tick = poh.tick(); + Entry { + num_hashes: tick.num_hashes, + id: tick.id, + transactions: vec![], } } - pub fn tick(&mut self) -> Result { - // Register and send the entry out while holding the lock if the max PoH height - // hasn't been reached. - // This guarantees PoH order and Entry production and banks LastId queue is the same - let mut poh = self.poh.lock().unwrap(); - if self.check_max_tick_height_reached(&*poh) { - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - } else { - self.register_and_send_tick(&mut *poh)?; - Ok(poh.tick_height) - } - } - - pub fn record(&self, mixin: Hash, txs: Vec) -> Result<()> { - // Register and send the entry out while holding the lock. - // This guarantees PoH order and Entry production and banks LastId queue is the same. - let mut poh = self.poh.lock().unwrap(); - if self.check_max_tick_height_reached(&*poh) { - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - } else { - self.record_and_send_txs(&mut *poh, mixin, txs)?; - Ok(()) - } - } - - fn check_max_tick_height_reached(&self, poh: &Poh) -> bool { + fn is_max_tick_height_reached(&self, poh: &Poh) -> bool { if let Some(max_tick_height) = self.max_tick_height { poh.tick_height >= max_tick_height } else { @@ -103,15 +128,15 @@ impl PohRecorder { Ok(()) } + fn generate_and_store_tick(&self, poh: &mut Poh) { + let tick_entry = self.generate_tick_entry(poh); + self.virtual_tick_entries.lock().unwrap().push(tick_entry); + } + fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> { - let tick = poh.tick(); - self.bank.register_entry_id(&tick.id); - let entry = Entry { - num_hashes: tick.num_hashes, - id: tick.id, - transactions: vec![], - }; - self.sender.send(vec![entry])?; + let tick_entry = self.generate_tick_entry(poh); + self.bank.register_entry_id(&tick_entry.id); + self.sender.send(vec![tick_entry])?; Ok(()) } } @@ -131,7 +156,8 @@ mod tests { let bank = Arc::new(Bank::new(&mint)); let last_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(bank, entry_sender, last_id, 0, None); + let mut poh_recorder = + PohRecorder::new(bank, entry_sender, last_id, 0, None, false, vec![]); //send some data let h1 = hash(b"hello world!"); diff --git a/src/poh_service.rs b/src/poh_service.rs new file mode 100644 index 0000000000..a9d53a464d --- /dev/null +++ b/src/poh_service.rs @@ -0,0 +1,94 @@ +//! The `poh_service` module implements a service that records the passing of +//! "ticks", a measure of time in the PoH stream + +use poh_recorder::PohRecorder; +use result::Result; +use service::Service; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread::sleep; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; +pub const NUM_TICKS_PER_SECOND: usize = 10; + +#[derive(Copy, Clone)] +pub enum Config { + /// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before transmitting a new entry. + Tick(usize), + /// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 poh once and producing 1 + /// tick. + Sleep(Duration), +} + +impl Default for Config { + fn default() -> Config { + // TODO: Change this to Tick to enable PoH + Config::Sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND as u64)) + } +} + +pub struct PohService { + tick_producer: JoinHandle>, + pub poh_exit: Arc, +} + +impl PohService { + pub fn exit(&self) -> () { + self.poh_exit.store(true, Ordering::Relaxed); + } + + pub fn close(self) -> thread::Result> { + self.exit(); + self.join() + } + + pub fn new(poh_recorder: PohRecorder, config: Config) -> Self { + // PohService is a headless producer, so when it exits it should notify the banking stage. + // Since channel are not used to talk between these threads an AtomicBool is used as a + // signal. + let poh_exit = Arc::new(AtomicBool::new(false)); + let poh_exit_ = poh_exit.clone(); + // Single thread to generate ticks + let tick_producer = Builder::new() + .name("solana-poh-service-tick_producer".to_string()) + .spawn(move || { + let mut poh_recorder_ = poh_recorder; + let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_); + poh_exit_.store(true, Ordering::Relaxed); + return_value + }).unwrap(); + + PohService { + tick_producer, + poh_exit, + } + } + + fn tick_producer(poh: &mut PohRecorder, config: Config, poh_exit: &AtomicBool) -> Result<()> { + loop { + match config { + Config::Tick(num) => { + for _ in 0..num { + poh.hash()?; + } + } + Config::Sleep(duration) => { + sleep(duration); + } + } + poh.tick()?; + if poh_exit.load(Ordering::Relaxed) { + debug!("tick service exited"); + return Ok(()); + } + } + } +} + +impl Service for PohService { + type JoinReturnType = Result<()>; + + fn join(self) -> thread::Result> { + self.tick_producer.join() + } +} diff --git a/src/tpu.rs b/src/tpu.rs index 2656dc7cf4..66fe9a902d 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -26,13 +26,14 @@ //! ``` use bank::Bank; -use banking_stage::{BankingStage, BankingStageReturnType, Config}; +use banking_stage::{BankingStage, BankingStageReturnType}; use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; use hash::Hash; use leader_vote_stage::LeaderVoteStage; use ledger_write_stage::LedgerWriteStage; +use poh_service::Config; use service::Service; use signature::Keypair; use sigverify_stage::SigVerifyStage;