diff --git a/src/fullnode.rs b/src/fullnode.rs index 42d5d07cd8..e7e620ce9e 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -521,18 +521,13 @@ impl Service for Fullnode { #[cfg(test)] mod tests { use super::*; - use crate::blob_fetch_stage::BlobFetchStage; use crate::blocktree::{ create_tmp_sample_ledger, tmp_copy_ledger, BlocktreeConfig, DEFAULT_SLOT_HEIGHT, }; use crate::entry::make_consecutive_blobs; - use crate::entry::EntrySlice; - use crate::gossip_service::{converge, make_listening_node}; use crate::leader_scheduler::make_active_set_entries; use crate::streamer::responder; use std::fs::remove_dir_all; - use std::sync::atomic::Ordering; - use std::thread::sleep; #[test] fn validator_exit() { @@ -854,137 +849,6 @@ mod tests { let _ignored = remove_dir_all(&validator_ledger_path).unwrap(); } - #[test] - fn test_tvu_behind() { - solana_logger::setup(); - - // Make leader node - let ticks_per_slot = 5; - let slots_per_epoch = 1; - let leader_keypair = Arc::new(Keypair::new()); - let validator_keypair = Arc::new(Keypair::new()); - - info!("leader: {:?}", leader_keypair.pubkey()); - info!("validator: {:?}", validator_keypair.pubkey()); - - let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new( - ticks_per_slot, - slots_per_epoch, - ticks_per_slot * slots_per_epoch, - ); - let config = PohServiceConfig::Sleep(Duration::from_millis(200)); - fullnode_config.tick_config = config; - - let (leader_node, _, leader_ledger_path, _, _) = setup_leader_validator( - &leader_keypair, - &validator_keypair, - 1, - 0, - "test_tvu_behind", - &fullnode_config.ledger_config(), - ); - - let leader_node_info = leader_node.info.clone(); - - info!("Start up a listener"); - let blob_receiver_exit = Arc::new(AtomicBool::new(false)); - let (_, _, mut listening_node, _) = make_listening_node(&leader_node.info); - let (blob_fetch_sender, blob_fetch_receiver) = channel(); - let blob_fetch_stage = BlobFetchStage::new( - Arc::new(listening_node.sockets.tvu.pop().unwrap()), - &blob_fetch_sender, - blob_receiver_exit.clone(), - ); - - let voting_keypair = VotingKeypair::new_local(&leader_keypair); - info!("Start the bootstrap leader"); - let leader = Fullnode::new( - leader_node, - &leader_keypair, - &leader_ledger_path, - voting_keypair, - Some(&leader_node_info), - &fullnode_config, - ); - - let (rotation_sender, rotation_receiver) = channel(); - - info!("Pause the Tvu"); - let pause_tvu = leader.node_services.tvu.get_pause(); - pause_tvu.store(true, Ordering::Relaxed); - - // Wait for convergence - converge(&leader_node_info, 2); - - // Wait for Tpu bank to progress while the Tvu bank is stuck - sleep(Duration::from_millis(1000)); - - info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); - { - let w_last_ids = leader.bank.last_ids().write().unwrap(); - assert!(w_last_ids.tick_height < ticks_per_slot - 1); - } - - // Clear the blobs we've received so far. After this rotation, we should - // no longer receive blobs from slot 0 - while let Ok(_) = blob_fetch_receiver.try_recv() {} - - let leader_exit = leader.run(Some(rotation_sender)); - - // Wait for Tpu bank to progress while the Tvu bank is stuck - sleep(Duration::from_millis(1000)); - - // Tvu bank lock is released here, so tvu should start making progress again and should signal a - // rotation. After rotation it will still be the slot leader as a new leader schedule has - // not been computed yet (still in epoch 0). In the next epoch (epoch 1), the node will - // transition to a validator. - info!("Unpause the Tvu"); - pause_tvu.store(false, Ordering::Relaxed); - let expected_rotations = vec![( - FullnodeReturnType::LeaderToValidatorRotation, - ticks_per_slot, - )]; - - for expected_rotation in expected_rotations { - loop { - let transition = rotation_receiver.recv().unwrap(); - info!("leader transition: {:?}", transition); - assert_eq!(expected_rotation, transition); - break; - } - } - - info!("Shut down"); - leader_exit(); - - // Make sure that after rotation we don't receive any blobs from slot 0 (make sure - // broadcast started again at the correct place) - while let Ok(new_blobs) = blob_fetch_receiver.try_recv() { - for blob in new_blobs { - assert_ne!(blob.read().unwrap().slot(), 0); - } - } - - // Check the ledger to make sure the PoH chains - { - let blocktree = - Blocktree::open_config(&leader_ledger_path, &fullnode_config.ledger_config()) - .unwrap(); - let entries: Vec<_> = (0..3) - .flat_map(|slot_height| blocktree.get_slot_entries(slot_height, 0, None).unwrap()) - .collect(); - - assert!(entries[1..].verify(&entries[0].id)) - } - - blob_receiver_exit.store(true, Ordering::Relaxed); - blob_fetch_stage.join().unwrap(); - - Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction"); - let _ignored = remove_dir_all(&leader_ledger_path).unwrap(); - } - fn setup_leader_validator( leader_keypair: &Arc, validator_keypair: &Arc, diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 6a9ff332ae..5c708371cc 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -22,8 +22,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; -#[cfg(test)] -use std::thread::sleep; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; @@ -51,8 +49,6 @@ impl Drop for Finalizer { pub struct ReplayStage { t_replay: JoinHandle<()>, exit: Arc, - #[cfg(test)] - pause: Arc, } impl ReplayStage { @@ -184,12 +180,6 @@ impl ReplayStage { leader_scheduler: &Arc>, ) -> (Self, EntryReceiver) { let (ledger_entry_sender, ledger_entry_receiver) = channel(); - #[cfg(test)] - let (pause, pause_) = { - let pause = Arc::new(AtomicBool::new(false)); - let pause_ = pause.clone(); - (pause, pause_) - }; let exit_ = exit.clone(); let leader_scheduler_ = leader_scheduler.clone(); let to_leader_sender = to_leader_sender.clone(); @@ -219,10 +209,6 @@ impl ReplayStage { if exit_.load(Ordering::Relaxed) { break; } - #[cfg(test)] - while pause_.load(Ordering::Relaxed) { - sleep(Duration::from_millis(200)); - } let timer = Duration::from_millis(100); let e = ledger_signal_receiver.recv_timeout(timer); match e { @@ -308,20 +294,7 @@ impl ReplayStage { }) .unwrap(); - ( - Self { - t_replay, - exit, - #[cfg(test)] - pause, - }, - ledger_entry_receiver, - ) - } - - #[cfg(test)] - pub fn get_pause(&self) -> Arc { - self.pause.clone() + (Self { t_replay, exit }, ledger_entry_receiver) } pub fn close(self) -> thread::Result<()> { diff --git a/src/tvu.rs b/src/tvu.rs index 4d46d1bce2..d5c541eda7 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -167,11 +167,6 @@ impl Tvu { } } - #[cfg(test)] - pub fn get_pause(&self) -> Arc { - self.replay_stage.get_pause() - } - pub fn get_state(&self) -> Hash { *self.last_entry_id.read().unwrap() }