diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index ef4e08ee1a..a12ffeff0b 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -7,7 +7,8 @@ use rayon::prelude::*; use solana::banking_stage::BankingStage; use solana::entry::Entry; use solana::packet::to_packets_chunked; -use solana::poh_service::PohServiceConfig; +use solana::poh_recorder::PohRecorder; +use solana::poh_service::{PohService, PohServiceConfig}; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; @@ -16,8 +17,9 @@ use solana_sdk::signature::{KeypairUtil, Signature}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::timing::MAX_ENTRY_IDS; use std::iter; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use test::Bencher; @@ -39,6 +41,20 @@ fn check_txs(receiver: &Receiver>, ref_tx_count: usize) { assert_eq!(total, ref_tx_count); } +fn create_test_recorder(bank: &Arc) -> (Arc>, PohService) { + let exit = Arc::new(AtomicBool::new(false)); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( + bank.tick_height(), + bank.last_id(), + ))); + let poh_service = PohService::new( + poh_recorder.clone(), + &PohServiceConfig::default(), + exit.clone(), + ); + (poh_recorder, poh_service) +} + #[bench] #[ignore] fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { @@ -101,11 +117,11 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (_stage, signal_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::default(), - &genesis_block.last_id(), std::u64::MAX, genesis_block.bootstrap_leader_id, ); @@ -129,6 +145,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { start += half_len; start %= verified.len(); }); + poh_service.close().unwrap(); } #[bench] @@ -209,11 +226,11 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (_stage, signal_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::default(), - &genesis_block.last_id(), std::u64::MAX, genesis_block.bootstrap_leader_id, ); @@ -237,4 +254,5 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { start += half_len; start %= verified.len(); }); + poh_service.close().unwrap(); } diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index 5bf41b1e47..44148fdb5b 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -6,7 +6,7 @@ pub const NUM_TICKS_PER_SECOND: usize = 10; // At 10 ticks/s, 8 ticks per slot implies that leader rotation and voting will happen // every 800 ms. A fast voting cadence ensures faster finality and convergence -pub const DEFAULT_TICKS_PER_SLOT: u64 = 8; +pub const DEFAULT_TICKS_PER_SLOT: u64 = 80; pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 64; /// The number of most recent `last_id` values that the bank will track the signatures diff --git a/src/banking_stage.rs b/src/banking_stage.rs index de0181b9b2..6734a2aaef 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -7,14 +7,12 @@ use crate::leader_confirmation_service::LeaderConfirmationService; use crate::packet::Packets; use crate::packet::SharedPackets; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBank}; -use crate::poh_service::{PohService, PohServiceConfig}; use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; use solana_metrics::counter::Counter; use solana_runtime::bank::{self, Bank, BankError}; -use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS}; use solana_sdk::transaction::Transaction; @@ -34,9 +32,8 @@ pub const NUM_THREADS: u32 = 10; /// Stores the stage's thread handle and output receiver. pub struct BankingStage { bank_thread_hdls: Vec>, - poh_service: PohService, + exit: Arc, leader_confirmation_service: LeaderConfirmationService, - poh_exit: Arc, } impl BankingStage { @@ -44,14 +41,12 @@ impl BankingStage { #[allow(clippy::new_ret_no_self)] pub fn new( bank: &Arc, + poh_recorder: &Arc>, verified_receiver: Receiver, - config: PohServiceConfig, - last_entry_id: &Hash, max_tick_height: u64, leader_id: Pubkey, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); let working_bank = WorkingBank { bank: bank.clone(), sender: entry_sender, @@ -59,29 +54,31 @@ impl BankingStage { max_tick_height, }; - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( - bank.tick_height(), - *last_entry_id, - ))); + info!( + "new working bank {} {} {}", + working_bank.min_tick_height, + working_bank.max_tick_height, + poh_recorder.lock().unwrap().poh.tick_height + ); + poh_recorder.lock().unwrap().set_working_bank(working_bank); + + let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. - let poh_exit = Arc::new(AtomicBool::new(false)); - - poh_recorder.lock().unwrap().set_working_bank(working_bank); - let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone()); + let exit = Arc::new(AtomicBool::new(false)); // Single thread to compute confirmation let leader_confirmation_service = - LeaderConfirmationService::new(bank.clone(), leader_id, poh_exit.clone()); + LeaderConfirmationService::new(bank.clone(), leader_id, exit.clone()); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..Self::num_threads()) .map(|_| { - let thread_bank = bank.clone(); let thread_verified_receiver = shared_verified_receiver.clone(); let thread_poh_recorder = poh_recorder.clone(); + let thread_bank = bank.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -110,9 +107,8 @@ impl BankingStage { ( Self { bank_thread_hdls, - poh_service, + exit, leader_confirmation_service, - poh_exit, }, entry_receiver, ) @@ -213,7 +209,7 @@ impl BankingStage { /// Returns the number of transactions successfully processed by the bank, which may be less /// than the total number if max PoH height was reached and the bank halted fn process_transactions( - bank: &Arc, + bank: &Bank, transactions: &[Transaction], poh: &Arc>, ) -> Result<(usize)> { @@ -226,7 +222,9 @@ impl BankingStage { &transactions[chunk_start..chunk_end], poh, ); + trace!("process_transcations: {:?}", result); if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { + info!("process transactions: max height reached"); break; } result?; @@ -237,7 +235,7 @@ impl BankingStage { /// Process the incoming packets pub fn process_packets( - bank: &Arc, + bank: &Bank, verified_receiver: &Arc>>, poh: &Arc>, ) -> Result { @@ -340,9 +338,8 @@ impl Service for BankingStage { for bank_thread_hdl in self.bank_thread_hdls { bank_thread_hdl.join()?; } - self.poh_exit.store(true, Ordering::Relaxed); + self.exit.store(true, Ordering::Relaxed); self.leader_confirmation_service.join()?; - self.poh_service.join()?; Ok(()) } } @@ -352,6 +349,7 @@ mod tests { use super::*; use crate::entry::EntrySlice; use crate::packet::to_packets; + use crate::poh_service::{PohService, PohServiceConfig}; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::native_program::ProgramError; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -359,21 +357,36 @@ mod tests { use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use std::thread::sleep; + fn create_test_recorder(bank: &Arc) -> (Arc>, PohService) { + let exit = Arc::new(AtomicBool::new(false)); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( + bank.tick_height(), + bank.last_id(), + ))); + let poh_service = PohService::new( + poh_recorder.clone(), + &PohServiceConfig::default(), + exit.clone(), + ); + (poh_recorder, poh_service) + } + #[test] fn test_banking_stage_shutdown1() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (banking_stage, _entry_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::default(), - &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, ); drop(verified_sender); banking_stage.join().unwrap(); + poh_service.close().unwrap(); } #[test] @@ -382,11 +395,11 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_id(); let (verified_sender, verified_receiver) = channel(); + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (banking_stage, entry_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::Sleep(Duration::from_millis(1)), - &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, ); @@ -401,6 +414,7 @@ mod tests { assert!(entries.verify(&start_hash)); assert_eq!(entries[entries.len() - 1].id, bank.last_id()); banking_stage.join().unwrap(); + poh_service.close().unwrap(); } #[test] @@ -409,11 +423,11 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_id(); let (verified_sender, verified_receiver) = channel(); + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (banking_stage, entry_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::default(), - &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, ); @@ -457,6 +471,7 @@ mod tests { }); drop(entry_receiver); banking_stage.join().unwrap(); + poh_service.close().unwrap(); } #[test] fn test_banking_stage_entryfication() { @@ -466,11 +481,11 @@ mod tests { let (genesis_block, mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (banking_stage, entry_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::default(), - &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, ); @@ -523,20 +538,22 @@ mod tests { .for_each(|x| assert_eq!(*x, Ok(()))); } assert_eq!(bank.get_balance(&alice.pubkey()), 1); + poh_service.close().unwrap(); } // Test that when the max_tick_height is reached, the banking stage exits #[test] fn test_max_tick_height_shutdown() { + solana_logger::setup(); let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); let max_tick_height = 10; + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (banking_stage, _entry_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::default(), - &bank.last_id(), max_tick_height, genesis_block.bootstrap_leader_id, ); @@ -551,6 +568,7 @@ mod tests { drop(verified_sender); banking_stage.join().unwrap(); + poh_service.close().unwrap(); } #[test] @@ -560,11 +578,11 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let ticks_per_slot = 1; let (verified_sender, verified_receiver) = channel(); + let (poh_recorder, poh_service) = create_test_recorder(&bank); let (mut banking_stage, _entry_receiver) = BankingStage::new( &bank, + &poh_recorder, verified_receiver, - PohServiceConfig::default(), - &bank.last_id(), ticks_per_slot, genesis_block.bootstrap_leader_id, ); @@ -599,6 +617,7 @@ mod tests { let (packets, start_index) = &unprocessed_packets[0]; assert_eq!(packets.read().unwrap().packets.len(), 1); // TODO: maybe compare actual packet contents too assert_eq!(*start_index, 0); + poh_service.close().unwrap(); } #[test] @@ -617,6 +636,7 @@ mod tests { bank.tick_height(), bank.last_id(), ))); + poh_recorder.lock().unwrap().set_working_bank(working_bank); let pubkey = Keypair::new().pubkey(); let transactions = vec![ @@ -625,7 +645,6 @@ mod tests { ]; let mut results = vec![Ok(()), Ok(())]; - poh_recorder.lock().unwrap().set_working_bank(working_bank); BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len()); @@ -648,6 +667,7 @@ mod tests { #[test] fn test_bank_process_and_record_transactions() { + solana_logger::setup(); let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); let bank = Arc::new(Bank::new(&genesis_block)); let pubkey = Keypair::new().pubkey(); @@ -678,18 +698,21 @@ mod tests { let mut need_tick = true; // read entries until I find mine, might be ticks... - while need_tick { - let entries = entry_receiver.recv().unwrap(); + while let Ok(entries) = entry_receiver.recv() { for (entry, _) in entries { if !entry.is_tick() { + trace!("got entry"); assert_eq!(entry.transactions.len(), transactions.len()); assert_eq!(bank.get_balance(&pubkey), 1); - } else { need_tick = false; + } else { + break; } } } + assert_eq!(need_tick, false); + let transactions = vec![SystemTransaction::new_move( &mint_keypair, pubkey, diff --git a/src/fullnode.rs b/src/fullnode.rs index 6000842620..b8feb29106 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -6,7 +6,8 @@ use crate::blocktree_processor::{self, BankForksInfo}; use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::gossip_service::GossipService; use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig}; -use crate::poh_service::PohServiceConfig; +use crate::poh_recorder::PohRecorder; +use crate::poh_service::{PohService, PohServiceConfig}; use crate::rpc_pubsub_service::PubSubService; use crate::rpc_service::JsonRpcService; use crate::rpc_subscriptions::RpcSubscriptions; @@ -23,7 +24,7 @@ use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, Result}; use std::time::Duration; @@ -102,6 +103,8 @@ pub struct Fullnode { blocktree: Arc, bank_forks: Arc>, leader_scheduler: Arc>, + poh_service: PohService, + poh_recorder: Arc>, } impl Fullnode { @@ -124,9 +127,25 @@ impl Fullnode { let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new( &config.leader_scheduler_config, ))); - let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = + let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = new_banks_from_blocktree(ledger_path, config.ticks_per_slot(), &leader_scheduler); + let exit = Arc::new(AtomicBool::new(false)); + let bank_info = &bank_forks_info[0]; + bank_forks.set_working_bank_id(bank_info.bank_id); + let bank = bank_forks.working_bank(); + + info!( + "starting PoH... {} {}", + bank.tick_height(), + bank_info.last_entry_id + ); + let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( + bank.tick_height(), + bank_info.last_entry_id, + ))); + let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, exit.clone()); + info!("node info: {:?}", node.info); info!("node entrypoint_info: {:?}", entrypoint_info_option); info!( @@ -134,7 +153,6 @@ impl Fullnode { node.sockets.gossip.local_addr().unwrap() ); - let exit = Arc::new(AtomicBool::new(false)); let blocktree = Arc::new(blocktree); let bank_forks = Arc::new(RwLock::new(bank_forks)); @@ -253,6 +271,8 @@ impl Fullnode { blocktree, bank_forks, leader_scheduler, + poh_service, + poh_recorder, } } @@ -293,7 +313,7 @@ impl Fullnode { }; self.node_services.tpu.switch_to_leader( self.bank_forks.read().unwrap().working_bank(), - PohServiceConfig::default(), + &self.poh_recorder, self.tpu_sockets .iter() .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) @@ -303,7 +323,6 @@ impl Fullnode { .expect("Failed to clone broadcast socket"), self.sigverify_disabled, rotation_info.slot, - rotation_info.last_entry_id, &self.blocktree, ); transition @@ -339,6 +358,13 @@ impl Fullnode { match self.rotation_receiver.recv_timeout(timeout) { Ok(rotation_info) => { + trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot); + //TODO: this will be called by the TVU every time it votes + //instead of here + self.poh_recorder.lock().unwrap().reset( + rotation_info.bank.tick_height(), + rotation_info.last_entry_id, + ); let slot = rotation_info.slot; let transition = self.rotate(rotation_info); debug!("role transition complete: {:?}", transition); @@ -360,16 +386,25 @@ impl Fullnode { // Used for notifying many nodes in parallel to exit fn exit(&self) { self.exit.store(true, Ordering::Relaxed); + // Need to force the poh_recorder to drop the WorkingBank, + // which contains the channel to BroadcastStage. This should be + // sufficient as long as no other rotations are happening that + // can cause the Tpu to restart a BankingStage and reset a + // WorkingBank in poh_recorder. It follows no other rotations can be + // in motion because exit()/close() are only called by the run() loop + // which is the sole initiator of rotations. + self.poh_recorder.lock().unwrap().clear_bank(); if let Some(ref rpc_service) = self.rpc_service { rpc_service.exit(); } if let Some(ref rpc_pubsub_service) = self.rpc_pubsub_service { rpc_pubsub_service.exit(); } - self.node_services.exit() + self.node_services.exit(); + self.poh_service.exit() } - pub fn close(self) -> Result<()> { + fn close(self) -> Result<()> { self.exit(); self.join() } @@ -411,6 +446,9 @@ impl Service for Fullnode { self.gossip_service.join()?; self.node_services.join()?; + trace!("exit node_services!"); + self.poh_service.join()?; + trace!("exit poh!"); Ok(()) } } diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 6bda70287b..36615c8e11 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -35,7 +35,7 @@ pub struct WorkingBank { } pub struct PohRecorder { - poh: Poh, + pub poh: Poh, tick_cache: Vec<(Entry, u64)>, working_bank: Option, } diff --git a/src/poh_service.rs b/src/poh_service.rs index 2d2d151426..363f2e3152 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -5,11 +5,12 @@ use crate::poh_recorder::PohRecorder; use crate::service::Service; use solana_sdk::timing::NUM_TICKS_PER_SECOND; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::SyncSender; use std::sync::{Arc, Mutex}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; -#[derive(Copy, Clone)] +#[derive(Clone)] pub enum PohServiceConfig { /// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before /// transmitting a new entry. @@ -17,6 +18,8 @@ pub enum PohServiceConfig { /// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 /// PoH once and producing 1 tick. Sleep(Duration), + /// each node in simulation will be blocked until the receiver reads their step + Step(SyncSender<()>), } impl Default for PohServiceConfig { @@ -43,7 +46,7 @@ impl PohService { pub fn new( poh_recorder: Arc>, - config: PohServiceConfig, + config: &PohServiceConfig, poh_exit: Arc, ) -> Self { // PohService is a headless producer, so when it exits it should notify the banking stage. @@ -51,11 +54,12 @@ impl PohService { // signal. let poh_exit_ = poh_exit.clone(); // Single thread to generate ticks + let config = config.clone(); let tick_producer = Builder::new() .name("solana-poh-service-tick_producer".to_string()) .spawn(move || { let poh_recorder = poh_recorder; - Self::tick_producer(&poh_recorder, config, &poh_exit_); + Self::tick_producer(&poh_recorder, &config, &poh_exit_); poh_exit_.store(true, Ordering::Relaxed); }) .unwrap(); @@ -68,18 +72,24 @@ impl PohService { fn tick_producer( poh: &Arc>, - config: PohServiceConfig, + config: &PohServiceConfig, poh_exit: &AtomicBool, ) { loop { match config { PohServiceConfig::Tick(num) => { - for _ in 1..num { + for _ in 1..*num { poh.lock().unwrap().hash(); } } PohServiceConfig::Sleep(duration) => { - sleep(duration); + sleep(*duration); + } + PohServiceConfig::Step(sender) => { + let r = sender.send(()); + if r.is_err() { + break; + } } } poh.lock().unwrap().tick(); @@ -149,7 +159,7 @@ mod tests { const HASHES_PER_TICK: u64 = 2; let poh_service = PohService::new( poh_recorder.clone(), - PohServiceConfig::Tick(HASHES_PER_TICK as usize), + &PohServiceConfig::Tick(HASHES_PER_TICK as usize), Arc::new(AtomicBool::new(false)), ); poh_recorder.lock().unwrap().set_working_bank(working_bank); @@ -207,7 +217,7 @@ mod tests { let poh_service = PohService::new( poh_recorder.clone(), - PohServiceConfig::default(), + &PohServiceConfig::default(), Arc::new(AtomicBool::new(false)), ); diff --git a/src/thin_client.rs b/src/thin_client.rs index 9b3a5aed32..ea30671888 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -256,13 +256,18 @@ impl ThinClient { /// Request a new last Entry ID from the server. This method blocks /// until the server sends a response. pub fn get_next_last_id(&mut self, previous_last_id: &Hash) -> Hash { + self.get_next_last_id_ext(previous_last_id, &|| { + sleep(Duration::from_millis(100)); + }) + } + pub fn get_next_last_id_ext(&mut self, previous_last_id: &Hash, func: &Fn()) -> Hash { loop { let last_id = self.get_last_id(); if last_id != *previous_last_id { break last_id; } debug!("Got same last_id ({:?}), will retry...", last_id); - sleep(Duration::from_millis(100)); + func() } } diff --git a/src/tpu.rs b/src/tpu.rs index 5f5b543972..95d797b476 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -7,17 +7,16 @@ use crate::broadcast_service::BroadcastService; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; -use crate::poh_service::PohServiceConfig; +use crate::poh_recorder::PohRecorder; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; use crate::tpu_forwarder::TpuForwarder; use solana_runtime::bank::Bank; -use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; pub enum TpuMode { @@ -191,12 +190,11 @@ impl Tpu { pub fn switch_to_leader( &mut self, bank: Arc, - tick_duration: PohServiceConfig, + poh_recorder: &Arc>, transactions_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, slot: u64, - last_entry_id: Hash, blocktree: &Arc, ) { self.close_and_forward_unprocessed_packets(); @@ -230,9 +228,8 @@ impl Tpu { let (banking_stage, entry_receiver) = BankingStage::new( &bank, + poh_recorder, verified_receiver, - tick_duration, - &last_entry_id, max_tick_height, self.id, ); diff --git a/tests/multinode.rs b/tests/multinode.rs index 328ef5679c..4324aa4fe9 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -7,6 +7,7 @@ use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{new_banks_from_blocktree, Fullnode, FullnodeConfig, FullnodeReturnType}; use solana::gossip_service::{converge, make_listening_node}; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; +use solana::poh_service::PohServiceConfig; use solana::result; use solana::service::Service; use solana::thin_client::{poll_gossip_for_leader, retry_get_balance}; @@ -20,7 +21,7 @@ use std::collections::{HashSet, VecDeque}; use std::env; use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, TryRecvError}; +use std::sync::mpsc::{channel, sync_channel, TryRecvError}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder}; use std::time::{Duration, Instant}; @@ -1672,8 +1673,10 @@ fn test_fullnode_rotate( // Create fullnode config, and set leader scheduler policies let mut fullnode_config = FullnodeConfig::default(); + let (tick_step_sender, tick_step_receiver) = sync_channel(1); fullnode_config.leader_scheduler_config.ticks_per_slot = ticks_per_slot; fullnode_config.leader_scheduler_config.slots_per_epoch = slots_per_epoch; + fullnode_config.tick_config = PohServiceConfig::Step(tick_step_sender); // Note: when debugging failures in this test, disabling voting can help keep the log noise // down by removing the extra vote transactions @@ -1869,8 +1872,10 @@ fn test_fullnode_rotate( if transact { let mut client = mk_client(&leader_info); - - client_last_id = client.get_next_last_id(&client_last_id); + client_last_id = client.get_next_last_id_ext(&client_last_id, &|| { + tick_step_receiver.recv().expect("tick step"); + sleep(Duration::from_millis(100)); + }); info!("Transferring 500 tokens, last_id={:?}", client_last_id); expected_bob_balance += 500; @@ -1878,14 +1883,24 @@ fn test_fullnode_rotate( .transfer(500, &mint_keypair, bob, &client_last_id) .unwrap(); debug!("transfer send, signature is {:?}", signature); - client.poll_for_signature(&signature).unwrap(); + for _ in 0..30 { + if client.poll_for_signature(&signature).is_err() { + tick_step_receiver.recv().expect("tick step"); + info!("poll for signature tick step received"); + } else { + break; + } + } debug!("transfer signature confirmed"); let actual_bob_balance = retry_get_balance(&mut client, &bob, Some(expected_bob_balance)).unwrap(); assert_eq!(actual_bob_balance, expected_bob_balance); debug!("account balance confirmed: {}", actual_bob_balance); - client_last_id = client.get_next_last_id(&client_last_id); + client_last_id = client.get_next_last_id_ext(&client_last_id, &|| { + tick_step_receiver.recv().expect("tick step"); + sleep(Duration::from_millis(100)); + }); } else { if include_validator { trace!("waiting for leader and validator to reach max tick height..."); @@ -1893,6 +1908,8 @@ fn test_fullnode_rotate( trace!("waiting for leader to reach max tick height..."); } } + tick_step_receiver.recv().expect("tick step"); + info!("tick step received"); } if transact { @@ -1901,6 +1918,7 @@ fn test_fullnode_rotate( } info!("Shutting down"); + drop(tick_step_receiver); for node_exit in node_exits { node_exit(); }