diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 9d39cb8a48..cfa4cf62cb 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -100,7 +100,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (to_leader_sender, _to_leader_recvr) = channel(); let (_stage, signal_receiver) = BankingStage::new( &bank, verified_receiver, @@ -108,7 +107,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { &genesis_block.last_id(), std::u64::MAX, genesis_block.bootstrap_leader_id, - &to_leader_sender, ); let mut id = genesis_block.last_id(); @@ -209,7 +207,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (to_leader_sender, _to_leader_recvr) = channel(); let (_stage, signal_receiver) = BankingStage::new( &bank, verified_receiver, @@ -217,7 +214,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { &genesis_block.last_id(), std::u64::MAX, genesis_block.bootstrap_leader_id, - &to_leader_sender, ); let mut id = genesis_block.last_id(); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 86d6c7a4e6..3a38095948 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -14,7 +14,6 @@ use crate::poh_service::{PohService, PohServiceConfig}; use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify_stage::VerifiedPackets; -use crate::tpu::TpuRotationSender; use bincode::deserialize; use log::Level; use solana_sdk::hash::Hash; @@ -51,7 +50,6 @@ impl BankingStage { last_entry_id: &Hash, max_tick_height: u64, leader_id: Pubkey, - to_validator_sender: &TpuRotationSender, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); @@ -61,8 +59,7 @@ impl BankingStage { // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. - let poh_service = - PohService::new(poh_recorder.clone(), config, to_validator_sender.clone()); + let poh_service = PohService::new(poh_recorder.clone(), config); // Single thread to compute confirmation let compute_confirmation_service = ComputeLeaderConfirmationService::new( @@ -357,7 +354,6 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (to_validator_sender, _) = channel(); let (banking_stage, _entry_receiver) = BankingStage::new( &bank, verified_receiver, @@ -365,7 +361,6 @@ mod tests { &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, - &to_validator_sender, ); drop(verified_sender); banking_stage.join().unwrap(); @@ -377,7 +372,6 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_id(); let (verified_sender, verified_receiver) = channel(); - let (to_validator_sender, _) = channel(); let (banking_stage, entry_receiver) = BankingStage::new( &bank, verified_receiver, @@ -385,7 +379,6 @@ mod tests { &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, - &to_validator_sender, ); sleep(Duration::from_millis(500)); drop(verified_sender); @@ -403,7 +396,6 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_id(); let (verified_sender, verified_receiver) = channel(); - let (to_validator_sender, _) = channel(); let (banking_stage, entry_receiver) = BankingStage::new( &bank, verified_receiver, @@ -411,7 +403,6 @@ mod tests { &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, - &to_validator_sender, ); // good tx @@ -458,7 +449,6 @@ mod tests { let (genesis_block, mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (to_validator_sender, _) = channel(); let (banking_stage, entry_receiver) = BankingStage::new( &bank, verified_receiver, @@ -466,7 +456,6 @@ mod tests { &bank.last_id(), DEFAULT_TICKS_PER_SLOT, genesis_block.bootstrap_leader_id, - &to_validator_sender, ); // Process a batch that includes a transaction that receives two tokens. @@ -522,7 +511,6 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (to_validator_sender, to_validator_receiver) = channel(); let max_tick_height = 10; let (banking_stage, _entry_receiver) = BankingStage::new( &bank, @@ -531,9 +519,16 @@ mod tests { &bank.last_id(), max_tick_height, genesis_block.bootstrap_leader_id, - &to_validator_sender, ); - assert_eq!(to_validator_receiver.recv().unwrap(), max_tick_height); + + loop { + let bank_tick_height = bank.tick_height(); + if bank_tick_height >= max_tick_height { + break; + } + sleep(Duration::from_millis(10)); + } + drop(verified_sender); banking_stage.join().unwrap(); } @@ -545,7 +540,6 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let ticks_per_slot = 1; let (verified_sender, verified_receiver) = channel(); - let (to_validator_sender, to_validator_receiver) = channel(); let (mut banking_stage, _entry_receiver) = BankingStage::new( &bank, verified_receiver, @@ -553,11 +547,16 @@ mod tests { &bank.last_id(), ticks_per_slot, genesis_block.bootstrap_leader_id, - &to_validator_sender, ); // Wait for Poh recorder to hit max height - assert_eq!(to_validator_receiver.recv().unwrap(), ticks_per_slot); + loop { + let bank_tick_height = bank.tick_height(); + if bank_tick_height >= leader_scheduler_config.ticks_per_slot { + break; + } + sleep(Duration::from_millis(10)); + } // Now send a transaction to the banking stage let transaction = SystemTransaction::new_account( diff --git a/src/fullnode.rs b/src/fullnode.rs index 7fed0fe123..42d5d07cd8 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -14,7 +14,7 @@ use crate::rpc_pubsub_service::PubSubService; use crate::rpc_service::JsonRpcService; use crate::service::Service; use crate::storage_stage::StorageState; -use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender}; +use crate::tpu::{Tpu, TpuRotationReceiver}; use crate::tvu::{Sockets, Tvu}; use crate::voting_keypair::VotingKeypair; use log::Level; @@ -104,7 +104,6 @@ pub struct Fullnode { tpu_sockets: Vec, broadcast_socket: UdpSocket, node_services: NodeServices, - rotation_sender: TpuRotationSender, rotation_receiver: TpuRotationReceiver, blocktree: Arc, leader_scheduler: Arc>, @@ -279,7 +278,6 @@ impl Fullnode { exit, tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, - rotation_sender, rotation_receiver, blocktree, leader_scheduler, @@ -371,7 +369,6 @@ impl Fullnode { max_tick_height, blob_index, last_entry_id, - &self.rotation_sender, &self.blocktree, &self.leader_scheduler, ); @@ -920,14 +917,8 @@ mod tests { // Wait for convergence converge(&leader_node_info, 2); - info!("Wait for leader -> validator transition"); - let rotation_signal = leader - .rotation_receiver - .recv() - .expect("signal for leader -> validator transition"); - debug!("received rotation signal: {:?}", rotation_signal); - // Re-send the rotation signal, it'll be received again once the tvu is unpaused - leader.rotation_sender.send(rotation_signal).expect("send"); + // Wait for Tpu bank to progress while the Tvu bank is stuck + sleep(Duration::from_millis(1000)); info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); { diff --git a/src/poh_service.rs b/src/poh_service.rs index 9716476a63..d849502be9 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -1,11 +1,9 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream -use crate::poh_recorder::{PohRecorder, PohRecorderError}; -use crate::result::Error; +use crate::poh_recorder::PohRecorder; use crate::result::Result; use crate::service::Service; -use crate::tpu::TpuRotationSender; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::sleep; @@ -46,11 +44,7 @@ impl PohService { self.join() } - pub fn new( - poh_recorder: PohRecorder, - config: PohServiceConfig, - to_validator_sender: TpuRotationSender, - ) -> Self { + pub fn new(poh_recorder: PohRecorder, config: PohServiceConfig) -> Self { // PohService is a headless producer, so when it exits it should notify the banking stage. // Since channel are not used to talk between these threads an AtomicBool is used as a // signal. @@ -61,12 +55,7 @@ impl PohService { .name("solana-poh-service-tick_producer".to_string()) .spawn(move || { let mut poh_recorder_ = poh_recorder; - let return_value = Self::tick_producer( - &mut poh_recorder_, - config, - &poh_exit_, - &to_validator_sender, - ); + let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_); poh_exit_.store(true, Ordering::Relaxed); return_value }) @@ -82,18 +71,13 @@ impl PohService { poh: &mut PohRecorder, config: PohServiceConfig, poh_exit: &AtomicBool, - to_validator_sender: &TpuRotationSender, ) -> Result<()> { - let max_tick_height = poh.max_tick_height(); loop { match config { PohServiceConfig::Tick(num) => { for _ in 1..num { let res = poh.hash(); if let Err(e) = res { - if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { - to_validator_sender.send(max_tick_height)?; - } return Err(e); } } @@ -104,9 +88,6 @@ impl PohService { } let res = poh.tick(); if let Err(e) = res { - if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { - to_validator_sender.send(max_tick_height)?; - } return Err(e); } if poh_exit.load(Ordering::Relaxed) { @@ -164,11 +145,9 @@ mod tests { }; const HASHES_PER_TICK: u64 = 2; - let (sender, _) = channel(); let poh_service = PohService::new( poh_recorder, PohServiceConfig::Tick(HASHES_PER_TICK as usize), - sender, ); // get some events diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 01e3e230a2..7852c5a256 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -290,14 +290,11 @@ impl ReplayStage { bank.tick_height(), &leader_scheduler_, ); - - if leader_id != last_leader_id { - if my_id == leader_id { - to_leader_sender.send(current_tick_height).unwrap(); - } else { - // TODO: Remove this soon once we boot the leader from ClusterInfo - cluster_info.write().unwrap().set_leader(leader_id); - } + if my_id == leader_id || my_id == last_leader_id { + to_leader_sender.send(current_tick_height).unwrap(); + } else { + // TODO: Remove this soon once we boot the leader from ClusterInfo + cluster_info.write().unwrap().set_leader(leader_id); } // Check for any slots that chain to this one diff --git a/src/tpu.rs b/src/tpu.rs index d5ffc3c359..eef211a8f3 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -203,7 +203,6 @@ impl Tpu { max_tick_height: u64, blob_index: u64, last_entry_id: &Hash, - to_validator_sender: &TpuRotationSender, blocktree: &Arc, leader_scheduler: &Arc>, ) { @@ -234,7 +233,6 @@ impl Tpu { last_entry_id, max_tick_height, self.id, - &to_validator_sender, ); let broadcast_service = BroadcastService::new(