From c65046e1a28e220fa29020164fe9c10ed44292da Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Sun, 24 Feb 2019 08:59:49 -0800 Subject: [PATCH] Use PohRecorder as the Poh synchronization point. (#2926) Cleanup poh_recorder and poh_service. * ticks are sent only if poh.tick_height > WorkingBank::min_tick_height and <= WorkingBank::max_tick_height * entries are recorded only if poh.tick_height >= WorkingBank::min_tick_height and < WorkingBank::max_tick_height --- src/banking_stage.rs | 80 ++++----- src/poh.rs | 2 +- src/poh_recorder.rs | 418 ++++++++++++++++++++++++++++++------------- src/poh_service.rs | 105 ++++------- 4 files changed, 358 insertions(+), 247 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index f094b1745..f9423ec57 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -19,7 +19,7 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS}; use solana_sdk::transaction::Transaction; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; @@ -37,6 +37,7 @@ pub struct BankingStage { bank_thread_hdls: Vec>, poh_service: PohService, leader_confirmation_service: LeaderConfirmationService, + poh_exit: Arc, } impl BankingStage { @@ -59,19 +60,18 @@ impl BankingStage { max_tick_height, }; - let poh_recorder = PohRecorder::new(bank.tick_height(), *last_entry_id); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( + bank.tick_height(), + *last_entry_id, + ))); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. let poh_exit = Arc::new(AtomicBool::new(false)); - let (poh_service, leader_sender) = - PohService::new(poh_recorder.clone(), config, poh_exit.clone()); - - leader_sender - .send(working_bank.clone()) - .expect("failed to send leader to poh_service"); + poh_recorder.lock().unwrap().set_working_bank(working_bank); + let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone()); // Single thread to compute confirmation let leader_confirmation_service = @@ -83,7 +83,6 @@ impl BankingStage { let thread_bank = bank.clone(); let thread_verified_receiver = shared_verified_receiver.clone(); let thread_poh_recorder = poh_recorder.clone(); - let thread_leader = working_bank.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -93,7 +92,6 @@ impl BankingStage { &thread_bank, &thread_verified_receiver, &thread_poh_recorder, - &thread_leader, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Ok(more_unprocessed_packets) => { @@ -115,6 +113,7 @@ impl BankingStage { bank_thread_hdls, poh_service, leader_confirmation_service, + poh_exit, }, entry_receiver, ) @@ -135,8 +134,7 @@ impl BankingStage { fn record_transactions( txs: &[Transaction], results: &[bank::Result<()>], - poh: &PohRecorder, - working_bank: &WorkingBank, + poh: &Arc>, ) -> Result<()> { let processed_transactions: Vec<_> = results .iter() @@ -158,7 +156,7 @@ impl BankingStage { if !processed_transactions.is_empty() { let hash = Transaction::hash(&processed_transactions); // record and unlock will unlock all the successfull transactions - poh.record(hash, processed_transactions, working_bank)?; + poh.lock().unwrap().record(hash, processed_transactions)?; } Ok(()) } @@ -166,8 +164,7 @@ impl BankingStage { pub fn process_and_record_transactions( bank: &Bank, txs: &[Transaction], - poh: &PohRecorder, - working_bank: &WorkingBank, + poh: &Arc>, ) -> Result<()> { let now = Instant::now(); // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -186,7 +183,7 @@ impl BankingStage { let record_time = { let now = Instant::now(); - Self::record_transactions(txs, &results, poh, working_bank)?; + Self::record_transactions(txs, &results, poh)?; now.elapsed() }; @@ -219,8 +216,7 @@ impl BankingStage { fn process_transactions( bank: &Arc, transactions: &[Transaction], - poh: &PohRecorder, - working_bank: &WorkingBank, + poh: &Arc>, ) -> Result<(usize)> { let mut chunk_start = 0; while chunk_start != transactions.len() { @@ -230,7 +226,6 @@ impl BankingStage { bank, &transactions[chunk_start..chunk_end], poh, - working_bank, ); if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { break; @@ -245,8 +240,7 @@ impl BankingStage { pub fn process_packets( bank: &Arc, verified_receiver: &Arc>>, - poh: &PohRecorder, - working_bank: &WorkingBank, + poh: &Arc>, ) -> Result { let recv_start = Instant::now(); let mms = verified_receiver @@ -297,8 +291,7 @@ impl BankingStage { debug!("verified transactions {}", verified_transactions.len()); - let processed = - Self::process_transactions(bank, &verified_transactions, poh, working_bank)?; + let processed = Self::process_transactions(bank, &verified_transactions, poh)?; if processed < verified_transactions.len() { bank_shutdown = true; // Collect any unprocessed transactions in this batch for forwarding @@ -348,8 +341,9 @@ impl Service for BankingStage { for bank_thread_hdl in self.bank_thread_hdls { bank_thread_hdl.join()?; } + self.poh_exit.store(true, Ordering::Relaxed); self.leader_confirmation_service.join()?; - let _ = self.poh_service.join()?; + self.poh_service.join()?; Ok(()) } } @@ -610,7 +604,10 @@ mod tests { max_tick_height: std::u64::MAX, }; - let poh_recorder = PohRecorder::new(bank.tick_height(), bank.last_id()); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( + bank.tick_height(), + bank.last_id(), + ))); let pubkey = Keypair::new().pubkey(); let transactions = vec![ @@ -619,8 +616,8 @@ mod tests { ]; let mut results = vec![Ok(()), Ok(())]; - BankingStage::record_transactions(&transactions, &results, &poh_recorder, &working_bank) - .unwrap(); + poh_recorder.lock().unwrap().set_working_bank(working_bank); + BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len()); @@ -629,15 +626,13 @@ mod tests { 1, ProgramError::ResultWithNegativeTokens, )); - BankingStage::record_transactions(&transactions, &results, &poh_recorder, &working_bank) - .unwrap(); + BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len()); // Other BankErrors should not be recorded results[0] = Err(BankError::AccountNotFound); - BankingStage::record_transactions(&transactions, &results, &poh_recorder, &working_bank) - .unwrap(); + BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].transactions.len(), transactions.len() - 1); } @@ -663,16 +658,14 @@ mod tests { min_tick_height: bank.tick_height(), max_tick_height: bank.tick_height() + 1, }; - let poh_recorder = PohRecorder::new(bank.tick_height(), bank.last_id()); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( + bank.tick_height(), + bank.last_id(), + ))); + poh_recorder.lock().unwrap().set_working_bank(working_bank); - BankingStage::process_and_record_transactions( - &bank, - &transactions, - &poh_recorder, - &working_bank, - ) - .unwrap(); - poh_recorder.tick(&working_bank).unwrap(); + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); + poh_recorder.lock().unwrap().tick(); let mut need_tick = true; // read entries until I find mine, might be ticks... @@ -697,12 +690,7 @@ mod tests { )]; assert_matches!( - BankingStage::process_and_record_transactions( - &bank, - &transactions, - &poh_recorder, - &working_bank - ), + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder,), Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) ); diff --git a/src/poh.rs b/src/poh.rs index 635ce8136..1b4720879 100644 --- a/src/poh.rs +++ b/src/poh.rs @@ -3,7 +3,7 @@ use solana_sdk::hash::{hash, hashv, Hash}; pub struct Poh { - id: Hash, + pub id: Hash, num_hashes: u64, pub tick_height: u64, } diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 7af64dab9..eef0edeea 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -1,6 +1,15 @@ //! The `poh_recorder` module provides an object for synchronizing with Proof of History. //! It synchronizes PoH, bank's register_tick and the ledger //! +//! PohRecorder will send ticks or entries to a WorkingBank, if the current range of ticks is +//! within the specified WorkingBank range. +//! +//! For Ticks: +//! * tick must be > WorkingBank::min_tick_height && tick must be <= WorkingBank::man_tick_height +//! +//! For Entries: +//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::man_tick_height +//! use crate::entry::Entry; use crate::poh::Poh; use crate::result::{Error, Result}; @@ -8,7 +17,7 @@ use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::transaction::Transaction; use std::sync::mpsc::Sender; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Clone)] pub enum PohRecorderError { @@ -25,118 +34,153 @@ pub struct WorkingBank { pub max_tick_height: u64, } -#[derive(Clone)] pub struct PohRecorder { - poh: Arc>, - tick_cache: Arc>>, + poh: Poh, + tick_cache: Vec<(Entry, u64)>, + working_bank: Option, } impl PohRecorder { - pub fn hash(&self) { - // TODO: amortize the cost of this lock by doing the loop in here for - // some min amount of hashes - let mut poh = self.poh.lock().unwrap(); - - poh.hash(); + pub fn clear_bank(&mut self) { + self.working_bank = None; } - fn flush_cache(&self, working_bank: &WorkingBank) -> Result<()> { + pub fn hash(&mut self) { + // TODO: amortize the cost of this lock by doing the loop in here for + // some min amount of hashes + self.poh.hash(); + } + + // synchronize PoH with a bank + pub fn reset(&mut self, tick_height: u64, last_id: Hash) { let mut cache = vec![]; - std::mem::swap(&mut cache, &mut self.tick_cache.lock().unwrap()); - if !cache.is_empty() { + info!( + "reset poh from: {},{} to: {},{}", + self.poh.id, self.poh.tick_height, last_id, tick_height, + ); + std::mem::swap(&mut cache, &mut self.tick_cache); + self.poh = Poh::new(last_id, tick_height); + } + + pub fn set_working_bank(&mut self, working_bank: WorkingBank) { + self.working_bank = Some(working_bank); + } + + // Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height + // On a record flush will flush the cache at the WorkingBank::min_tick_height, since a record + // occurs after the min_tick_height was generated + fn flush_cache(&mut self, tick: bool) -> Result<()> { + // check_tick_height is called before flush cache, so it cannot overrun the bank + // so a bank that is so late that it's slot fully generated before it starts recording + // will fail instead of broadcasting any ticks + let working_bank = self + .working_bank + .as_ref() + .ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?; + if self.poh.tick_height < working_bank.min_tick_height { + return Err(Error::PohRecorderError( + PohRecorderError::MinHeightNotReached, + )); + } + if tick && self.poh.tick_height == working_bank.min_tick_height { + return Err(Error::PohRecorderError( + PohRecorderError::MinHeightNotReached, + )); + } + + let cnt = self + .tick_cache + .iter() + .take_while(|x| x.1 <= working_bank.max_tick_height) + .count(); + let e = if cnt > 0 { + trace!( + "flush_cache: {} {} sending: {}", + working_bank.bank.tick_height(), + working_bank.max_tick_height, + cnt, + ); + let cache: Vec = self.tick_cache[..cnt].iter().map(|x| x.0.clone()).collect(); for t in &cache { working_bank.bank.register_tick(&t.id); } - working_bank.sender.send(cache)?; + working_bank.sender.send(cache) + } else { + Ok(()) + }; + if self.poh.tick_height >= working_bank.max_tick_height { + info!("poh_record: max_tick_height reached, setting working bank to None"); + self.working_bank = None; } + if e.is_err() { + info!("WorkingBank::sender disconnected {:?}", e); + //revert the cache, but clear the working bank + self.working_bank = None; + } else { + //commit the flush + let _ = self.tick_cache.drain(..cnt); + } + Ok(()) } - pub fn tick(&self, working_bank: &WorkingBank) -> Result<()> { + pub fn tick(&mut self) { // Register and send the entry out while holding the lock if the max PoH height // hasn't been reached. // This guarantees PoH order and Entry production and banks LastId queue is the same - let mut poh = self.poh.lock().unwrap(); - - Self::check_tick_height(&poh, working_bank).map_err(|e| { - let tick = Self::generate_tick(&mut poh); - self.tick_cache.lock().unwrap().push(tick); - e - })?; - ; - self.flush_cache(working_bank)?; - - Self::register_and_send_tick(&mut *poh, working_bank) + let tick = self.generate_tick(); + trace!("tick {}", tick.1); + self.tick_cache.push(tick); + let _ = self.flush_cache(true); } - pub fn record( - &self, - mixin: Hash, - txs: Vec, - working_bank: &WorkingBank, - ) -> Result<()> { + pub fn record(&mut self, mixin: Hash, txs: Vec) -> Result<()> { // Register and send the entry out while holding the lock. // This guarantees PoH order and Entry production and banks LastId queue is the same. - let mut poh = self.poh.lock().unwrap(); - - Self::check_tick_height(&poh, working_bank)?; - self.flush_cache(working_bank)?; - - Self::record_and_send_txs(&mut *poh, mixin, txs, working_bank) + self.flush_cache(false)?; + self.record_and_send_txs(mixin, txs) } /// A recorder to synchronize PoH with the following data structures /// * bank - the LastId's queue is updated on `tick` and `record` events /// * sender - the Entry channel that outputs to the ledger pub fn new(tick_height: u64, last_entry_id: Hash) -> Self { - let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height))); - let tick_cache = Arc::new(Mutex::new(vec![])); - PohRecorder { poh, tick_cache } - } - - fn check_tick_height(poh: &Poh, working_bank: &WorkingBank) -> Result<()> { - if poh.tick_height < working_bank.min_tick_height { - Err(Error::PohRecorderError( - PohRecorderError::MinHeightNotReached, - )) - } else if poh.tick_height >= working_bank.max_tick_height { - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - } else { - Ok(()) + let poh = Poh::new(last_entry_id, tick_height); + PohRecorder { + poh, + tick_cache: vec![], + working_bank: None, } } - fn record_and_send_txs( - poh: &mut Poh, - mixin: Hash, - txs: Vec, - working_bank: &WorkingBank, - ) -> Result<()> { - let entry = poh.record(mixin); + fn record_and_send_txs(&mut self, mixin: Hash, txs: Vec) -> Result<()> { + let working_bank = self + .working_bank + .as_ref() + .ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?; + let entry = self.poh.record(mixin); assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function"); let entry = Entry { num_hashes: entry.num_hashes, id: entry.id, transactions: txs, }; + trace!("sending entry {}", entry.is_tick()); working_bank.sender.send(vec![entry])?; Ok(()) } - fn generate_tick(poh: &mut Poh) -> Entry { - let tick = poh.tick(); - Entry { - num_hashes: tick.num_hashes, - id: tick.id, - transactions: vec![], - } - } - - fn register_and_send_tick(poh: &mut Poh, working_bank: &WorkingBank) -> Result<()> { - let tick = Self::generate_tick(poh); - working_bank.bank.register_tick(&tick.id); - working_bank.sender.send(vec![tick])?; - Ok(()) + fn generate_tick(&mut self) -> (Entry, u64) { + let tick = self.poh.tick(); + assert_ne!(tick.tick_height, 0); + ( + Entry { + num_hashes: tick.num_hashes, + id: tick.id, + transactions: vec![], + }, + tick.tick_height, + ) } } @@ -150,51 +194,145 @@ mod tests { use std::sync::Arc; #[test] - fn test_poh_recorder() { + fn test_poh_recorder_no_zero_tick() { + let prev_id = Hash::default(); + let mut poh_recorder = PohRecorder::new(0, prev_id); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + assert_eq!(poh_recorder.tick_cache[0].1, 1); + assert_eq!(poh_recorder.poh.tick_height, 1); + } + + #[test] + fn test_poh_recorder_tick_height_is_last_tick() { + let prev_id = Hash::default(); + let mut poh_recorder = PohRecorder::new(0, prev_id); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 2); + assert_eq!(poh_recorder.tick_cache[1].1, 2); + assert_eq!(poh_recorder.poh.tick_height, 2); + } + + #[test] + fn test_poh_recorder_reset_clears_cache() { + let mut poh_recorder = PohRecorder::new(0, Hash::default()); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + poh_recorder.reset(0, Hash::default()); + assert_eq!(poh_recorder.tick_cache.len(), 0); + } + + #[test] + fn test_poh_recorder_clear() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); - let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(0, prev_id); + let (entry_sender, _) = channel(); + let mut poh_recorder = PohRecorder::new(0, prev_id); let working_bank = WorkingBank { bank, sender: entry_sender, - min_tick_height: 0, - max_tick_height: 2, + min_tick_height: 2, + max_tick_height: 3, }; + poh_recorder.set_working_bank(working_bank); + assert!(poh_recorder.working_bank.is_some()); + poh_recorder.clear_bank(); + assert!(poh_recorder.working_bank.is_none()); + } - //send some data - let h1 = hash(b"hello world!"); + #[test] + fn test_poh_recorder_tick_sent_after_min() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_id = bank.last_id(); + let (entry_sender, entry_receiver) = channel(); + let mut poh_recorder = PohRecorder::new(0, prev_id); + + let working_bank = WorkingBank { + bank, + sender: entry_sender, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + poh_recorder.tick(); + //tick height equal to min_tick_height + //no tick has been sent + assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 2); + assert!(entry_receiver.try_recv().is_err()); + + // all ticks are sent after height > min + poh_recorder.tick(); + assert_eq!(poh_recorder.poh.tick_height, 3); + assert_eq!(poh_recorder.tick_cache.len(), 0); + let e = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 3); + assert!(poh_recorder.working_bank.is_none()); + } + + #[test] + fn test_poh_recorder_tick_sent_upto_and_including_max() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_id = bank.last_id(); + let (entry_sender, entry_receiver) = channel(); + let mut poh_recorder = PohRecorder::new(0, prev_id); + + poh_recorder.tick(); + poh_recorder.tick(); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 4); + assert_eq!(poh_recorder.poh.tick_height, 4); + + let working_bank = WorkingBank { + bank, + sender: entry_sender, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + + assert_eq!(poh_recorder.poh.tick_height, 5); + assert!(poh_recorder.working_bank.is_none()); + let e = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 3); + } + + #[test] + fn test_poh_recorder_record_to_early() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_id = bank.last_id(); + let (entry_sender, entry_receiver) = channel(); + let mut poh_recorder = PohRecorder::new(0, prev_id); + + let working_bank = WorkingBank { + bank, + sender: entry_sender, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); let tx = test_tx(); - poh_recorder - .record(h1, vec![tx.clone()], &working_bank) - .unwrap(); - //get some events - let _e = entry_receiver.recv().unwrap(); - - poh_recorder.tick(&working_bank).unwrap(); - let _e = entry_receiver.recv().unwrap(); - - poh_recorder.tick(&working_bank).unwrap(); - let _e = entry_receiver.recv().unwrap(); - - // max tick height reached - assert!(poh_recorder.tick(&working_bank).is_err()); - assert!(poh_recorder.record(h1, vec![tx], &working_bank).is_err()); - - //make sure it handles channel close correctly - drop(entry_receiver); - assert!(poh_recorder.tick(&working_bank).is_err()); + let h1 = hash(b"hello world!"); + assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err()); + assert!(entry_receiver.try_recv().is_err()); } #[test] - fn test_poh_recorder_tick_cache() { + fn test_poh_recorder_record_at_min_passes() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(0, prev_id); + let mut poh_recorder = PohRecorder::new(0, prev_id); let working_bank = WorkingBank { bank, @@ -202,50 +340,72 @@ mod tests { min_tick_height: 1, max_tick_height: 2, }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + assert_eq!(poh_recorder.poh.tick_height, 1); + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert!(poh_recorder.record(h1, vec![tx.clone()]).is_ok()); + assert_eq!(poh_recorder.tick_cache.len(), 0); - // tick should be cached - assert!(poh_recorder.tick(&working_bank).is_err()); - assert!(entry_receiver.try_recv().is_err()); - - // working_bank should be at the right height - poh_recorder.tick(&working_bank).unwrap(); - - let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries.len(), 1); - let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries.len(), 1); + //tick in the cache + entry + let e = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 1); + assert!(e[0].is_tick()); + let e = entry_receiver.recv().expect("recv 2"); + assert!(!e[0].is_tick()); } #[test] - fn test_poh_recorder_tick_cache_old_working_bank() { + fn test_poh_recorder_record_at_max_fails() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(0, prev_id); + let mut poh_recorder = PohRecorder::new(0, prev_id); let working_bank = WorkingBank { bank, sender: entry_sender, min_tick_height: 1, - max_tick_height: 1, + max_tick_height: 2, }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.poh.tick_height, 2); + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err()); - // tick should be cached - assert_matches!( - poh_recorder.tick(&working_bank), - Err(Error::PohRecorderError( - PohRecorderError::MinHeightNotReached - )) - ); + let e = entry_receiver.recv().expect("recv 1"); + assert_eq!(e.len(), 2); + assert!(e[0].is_tick()); + assert!(e[1].is_tick()); + } - // working_bank should be past MaxHeight - assert_matches!( - poh_recorder.tick(&working_bank), - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) - ); - assert_eq!(poh_recorder.tick_cache.lock().unwrap().len(), 2); + #[test] + fn test_poh_cache_on_disconnect() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_id = bank.last_id(); + let (entry_sender, entry_receiver) = channel(); + let mut poh_recorder = PohRecorder::new(0, prev_id); - assert!(entry_receiver.try_recv().is_err()); + let working_bank = WorkingBank { + bank, + sender: entry_sender, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + poh_recorder.tick(); + assert_eq!(poh_recorder.poh.tick_height, 2); + drop(entry_receiver); + poh_recorder.tick(); + assert!(poh_recorder.working_bank.is_none()); + assert_eq!(poh_recorder.tick_cache.len(), 3); } } diff --git a/src/poh_service.rs b/src/poh_service.rs index 5b3178cbd..f3000cea5 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -1,13 +1,11 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream -use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBank}; -use crate::result::{Error, Result}; +use crate::poh_recorder::PohRecorder; use crate::service::Service; use solana_sdk::timing::NUM_TICKS_PER_SECOND; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; @@ -29,7 +27,7 @@ impl Default for PohServiceConfig { } pub struct PohService { - tick_producer: JoinHandle>, + tick_producer: JoinHandle<()>, poh_exit: Arc, } @@ -38,95 +36,64 @@ impl PohService { self.poh_exit.store(true, Ordering::Relaxed); } - pub fn close(self) -> thread::Result> { + pub fn close(self) -> thread::Result<()> { self.exit(); self.join() } pub fn new( - poh_recorder: PohRecorder, + poh_recorder: Arc>, config: PohServiceConfig, poh_exit: Arc, - ) -> (Self, Sender) { + ) -> Self { // PohService is a headless producer, so when it exits it should notify the banking stage. // Since channel are not used to talk between these threads an AtomicBool is used as a // signal. let poh_exit_ = poh_exit.clone(); - let (working_bank_sender, working_bank_receiver) = channel(); // Single thread to generate ticks let tick_producer = Builder::new() .name("solana-poh-service-tick_producer".to_string()) .spawn(move || { - let mut poh_recorder = poh_recorder; - let working_bank_receiver = working_bank_receiver; - let return_value = Self::tick_producer( - &working_bank_receiver, - &mut poh_recorder, - config, - &poh_exit_, - ); + let poh_recorder = poh_recorder; + Self::tick_producer(&poh_recorder, config, &poh_exit_); poh_exit_.store(true, Ordering::Relaxed); - return_value }) .unwrap(); - ( - Self { - tick_producer, - poh_exit, - }, - working_bank_sender, - ) + Self { + tick_producer, + poh_exit, + } } fn tick_producer( - working_bank_receiver: &Receiver, - poh: &mut PohRecorder, + poh: &Arc>, config: PohServiceConfig, poh_exit: &AtomicBool, - ) -> Result<()> { - let mut working_bank = None; + ) { loop { - if working_bank.is_none() { - let result = working_bank_receiver.try_recv(); - working_bank = match result { - Err(TryRecvError::Empty) => None, - _ => Some(result?), - }; - } match config { PohServiceConfig::Tick(num) => { for _ in 1..num { - poh.hash(); + poh.lock().unwrap().hash(); } } PohServiceConfig::Sleep(duration) => { sleep(duration); } } - let result = if let Some(ref current_leader) = working_bank { - poh.tick(current_leader) - } else { - Ok(()) - }; - match result { - Err(Error::PohRecorderError(PohRecorderError::MinHeightNotReached)) => (), - Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { - working_bank = None; - } - e => e?, - }; + poh.lock().unwrap().tick(); if poh_exit.load(Ordering::Relaxed) { - return Ok(()); + return; } } } } impl Service for PohService { - type JoinReturnType = Result<()>; + type JoinReturnType = (); - fn join(self) -> thread::Result> { + fn join(self) -> thread::Result<()> { self.tick_producer.join() } } @@ -134,6 +101,8 @@ impl Service for PohService { #[cfg(test)] mod tests { use super::*; + use crate::poh_recorder::WorkingBank; + use crate::result::Result; use crate::test_tx::test_tx; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; @@ -147,7 +116,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(bank.tick_height(), prev_id); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_id))); let exit = Arc::new(AtomicBool::new(false)); let working_bank = WorkingBank { bank: bank.clone(), @@ -158,7 +127,6 @@ mod tests { let entry_producer: JoinHandle> = { let poh_recorder = poh_recorder.clone(); - let working_bank = working_bank.clone(); let exit = exit.clone(); Builder::new() @@ -168,7 +136,7 @@ mod tests { // send some data let h1 = hash(b"hello world!"); let tx = test_tx(); - poh_recorder.record(h1, vec![tx], &working_bank).unwrap(); + poh_recorder.lock().unwrap().record(h1, vec![tx]).unwrap(); if exit.load(Ordering::Relaxed) { break Ok(()); @@ -179,15 +147,12 @@ mod tests { }; const HASHES_PER_TICK: u64 = 2; - let (poh_service, working_bank_sender) = PohService::new( + let poh_service = PohService::new( poh_recorder.clone(), PohServiceConfig::Tick(HASHES_PER_TICK as usize), Arc::new(AtomicBool::new(false)), ); - - working_bank_sender - .send(working_bank.clone()) - .expect("send"); + poh_recorder.lock().unwrap().set_working_bank(working_bank); // get some events let mut hashes = 0; @@ -230,7 +195,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let prev_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let poh_recorder = PohRecorder::new(bank.tick_height(), prev_id); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_id))); let exit = Arc::new(AtomicBool::new(false)); let working_bank = WorkingBank { bank: bank.clone(), @@ -239,22 +204,20 @@ mod tests { max_tick_height: bank.tick_height() + 5, }; - let (poh_service, working_bank_sender) = PohService::new( + let poh_service = PohService::new( poh_recorder.clone(), PohServiceConfig::default(), Arc::new(AtomicBool::new(false)), ); - working_bank_sender.send(working_bank).expect("send"); + poh_recorder.lock().unwrap().set_working_bank(working_bank); - // all 5 ticks are expected - // First 3 ticks must be sent all at once, since bank shouldn't see them until - // the bank's min_tick_height(3) is reached. - let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries.len(), 3); - let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries.len(), 1); - let entries = entry_receiver.recv().unwrap(); + // all 5 ticks are expected, there is no tick 0 + // First 4 ticks must be sent all at once, since bank shouldn't see them until + // the after bank's min_tick_height(3) is reached. + let entries = entry_receiver.recv().expect("recv 1"); + assert_eq!(entries.len(), 4); + let entries = entry_receiver.recv().expect("recv 2"); assert_eq!(entries.len(), 1); //WorkingBank should be dropped by the PohService thread as well