diff --git a/src/fullnode.rs b/src/fullnode.rs index b09ad1f4e0..026160cff5 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -26,7 +26,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender}; use std::sync::{Arc, RwLock}; -use std::thread::{spawn, Result}; +use std::thread::{sleep, spawn, Result}; use std::time::{Duration, Instant}; struct NodeServices { @@ -66,6 +66,7 @@ pub struct FullnodeConfig { pub storage_rotate_count: u64, pub leader_scheduler_config: LeaderSchedulerConfig, pub ledger_config: BlocktreeConfig, + pub tick_config: PohServiceConfig, } impl Default for FullnodeConfig { fn default() -> Self { @@ -80,6 +81,7 @@ impl Default for FullnodeConfig { storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, leader_scheduler_config: LeaderSchedulerConfig::default(), ledger_config: BlocktreeConfig::default(), + tick_config: PohServiceConfig::default(), } } } @@ -315,11 +317,16 @@ impl Fullnode { fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType { trace!( - "leader_to_validator({:?}): tick_height={}", + "leader_to_validator({:?}): tick_height={}, bt: {}", self.id, tick_height, + self.bank.tick_height() ); + while self.bank.tick_height() < tick_height { + sleep(Duration::from_millis(10)); + } + let scheduled_leader = { let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap(); @@ -338,7 +345,7 @@ impl Fullnode { if scheduled_leader == self.id { debug!("node is still the leader"); - let last_entry_id = self.node_services.tvu.get_state(); + let last_entry_id = self.bank.last_id(); self.validator_to_leader(tick_height, last_entry_id); FullnodeReturnType::LeaderToLeaderRotation } else { @@ -554,11 +561,17 @@ impl Service for Fullnode { #[cfg(test)] mod tests { use super::*; + use crate::blob_fetch_stage::BlobFetchStage; use crate::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger}; use crate::entry::make_consecutive_blobs; + use crate::entry::EntrySlice; + use crate::gossip_service::{converge, make_listening_node}; use crate::leader_scheduler::make_active_set_entries; use crate::streamer::responder; + use std::cmp::min; use std::fs::remove_dir_all; + use std::sync::atomic::Ordering; + use std::thread::sleep; #[test] fn validator_exit() { @@ -854,13 +867,12 @@ mod tests { } #[test] - #[ignore] // TODO: Make this test less hacky fn test_tvu_behind() { solana_logger::setup(); // Make leader node let ticks_per_slot = 5; - let slots_per_epoch = 2; + let slots_per_epoch = 1; let leader_keypair = Arc::new(Keypair::new()); let validator_keypair = Arc::new(Keypair::new()); @@ -885,6 +897,18 @@ mod tests { slots_per_epoch, ticks_per_slot * slots_per_epoch, )); + let config = PohServiceConfig::Sleep(Duration::from_millis(200)); + fullnode_config.tick_config = config; + + info!("Start up a listener"); + let blob_receiver_exit = Arc::new(AtomicBool::new(false)); + let (_, _, mut listening_node, _) = make_listening_node(&leader_node.info); + let (blob_fetch_sender, blob_fetch_receiver) = channel(); + let blob_fetch_stage = BlobFetchStage::new( + Arc::new(listening_node.sockets.tvu.pop().unwrap()), + &blob_fetch_sender, + blob_receiver_exit.clone(), + ); let voting_keypair = VotingKeypair::new_local(&leader_keypair); info!("Start the bootstrap leader"); @@ -897,38 +921,50 @@ mod tests { &fullnode_config, ); - info!("Hold Tvu bank lock to prevent tvu from making progress"); + let (rotation_sender, rotation_receiver) = channel(); + + info!("Pause the Tvu"); + let pause_tvu = leader.node_services.tvu.get_pause(); + pause_tvu.store(true, Ordering::Relaxed); + + // Wait for convergence + converge(&leader_node_info, 2); + + info!("Wait for leader -> validator transition"); + let signal = leader + .to_validator_receiver + .recv() + .expect("signal for leader -> validator transition"); + let (rn_sender, rn_receiver) = channel(); + rn_sender.send(signal).expect("send"); + leader.to_validator_receiver = rn_receiver; + + info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); { let w_last_ids = leader.bank.last_ids().write().unwrap(); - - info!("Wait for leader -> validator transition"); - let signal = leader - .to_validator_receiver - .recv() - .expect("signal for leader -> validator transition"); - let (rn_sender, rn_receiver) = channel(); - rn_sender.send(signal).expect("send"); - leader.to_validator_receiver = rn_receiver; - - info!("Make sure the tvu bank is behind"); - assert_eq!(w_last_ids.tick_height, 2); + assert!(w_last_ids.tick_height < ticks_per_slot - 1); } - // Release tvu bank lock, tvu should start making progress again and should signal a - // rotate. After rotation it will still be the slot leader as a new leader schedule has - // not been computed yet (still in epoch 0) - info!("Release tvu bank lock"); - let (rotation_sender, rotation_receiver) = channel(); + // Clear the blobs we've recieved so far. After this rotation, we should + // no longer receive blobs from slot 0 + while let Ok(_) = blob_fetch_receiver.try_recv() {} + let leader_exit = leader.run(Some(rotation_sender)); + + // Wait for leader_to_validator() function execution to trigger a leader to leader rotation + sleep(Duration::from_millis(1000)); + + // Tvu bank lock is released here, so tvu should start making progress again and should signal a + // rotation. After rotation it will still be the slot leader as a new leader schedule has + // not been computed yet (still in epoch 0). In the next epoch (epoch 1), the node will + // transition to a validator. + info!("Unpause the Tvu"); + pause_tvu.store(false, Ordering::Relaxed); let expected_rotations = vec![ (FullnodeReturnType::LeaderToLeaderRotation, ticks_per_slot), - ( - FullnodeReturnType::LeaderToLeaderRotation, - 2 * ticks_per_slot, - ), ( FullnodeReturnType::LeaderToValidatorRotation, - 3 * ticks_per_slot, + 2 * ticks_per_slot, ), ]; @@ -943,6 +979,28 @@ mod tests { info!("Shut down"); leader_exit(); + + // Make sure that after rotation we don't receive any blobs from slot 0 (make sure + // broadcast started again at the correct place) + while let Ok(new_blobs) = blob_fetch_receiver.try_recv() { + for blob in new_blobs { + assert_ne!(blob.read().unwrap().slot(), 0); + } + } + + // Check the ledger to make sure the PoH chains + { + let blocktree = Blocktree::open(&leader_ledger_path).unwrap(); + let entries: Vec<_> = (0..3) + .flat_map(|slot_height| blocktree.get_slot_entries(slot_height, 0, None).unwrap()) + .collect(); + + assert!(entries[1..].verify(&entries[0].id)) + } + + blob_receiver_exit.store(true, Ordering::Relaxed); + blob_fetch_stage.join().unwrap(); + Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction"); let _ignored = remove_dir_all(&leader_ledger_path).unwrap(); } @@ -989,8 +1047,10 @@ mod tests { let non_tick_active_entries_len = active_set_entries.len() - num_ending_ticks as usize; let remaining_ticks_in_zeroth_slot = ticks_per_block - num_genesis_ticks; - let entries_for_zeroth_slot = - non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize; + let entries_for_zeroth_slot = min( + active_set_entries.len(), + non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize, + ); let entry_chunks: Vec<_> = active_set_entries[entries_for_zeroth_slot..] .chunks(ticks_per_block as usize) .collect(); diff --git a/src/gossip_service.rs b/src/gossip_service.rs index 7608de03a3..62915e828b 100644 --- a/src/gossip_service.rs +++ b/src/gossip_service.rs @@ -1,14 +1,18 @@ //! The `gossip_service` module implements the network control plane. use crate::blocktree::Blocktree; -use crate::cluster_info::ClusterInfo; +use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::service::Service; use crate::streamer; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; +use std::thread::sleep; use std::thread::{self, JoinHandle}; +use std::time::Duration; pub struct GossipService { exit: Arc, @@ -51,6 +55,82 @@ impl GossipService { } } +pub fn make_listening_node( + leader: &NodeInfo, +) -> (GossipService, Arc>, Node, Pubkey) { + let keypair = Keypair::new(); + let exit = Arc::new(AtomicBool::new(false)); + let new_node = Node::new_localhost_with_pubkey(keypair.pubkey()); + let new_node_info = new_node.info.clone(); + let id = new_node.info.id; + let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair)); + new_node_cluster_info.insert_info(leader.clone()); + new_node_cluster_info.set_leader(leader.id); + let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info)); + let gossip_service = GossipService::new( + &new_node_cluster_info_ref, + None, + new_node + .sockets + .gossip + .try_clone() + .expect("Failed to clone gossip"), + exit.clone(), + ); + + (gossip_service, new_node_cluster_info_ref, new_node, id) +} + +pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec { + info!("Wait for convergence with {} nodes", num_nodes); + // Let's spy on the network + let (gossip_service, spy_ref, id) = make_spy_node(node); + trace!( + "converge spy_node {} looking for at least {} nodes", + id, + num_nodes + ); + + // Wait for the cluster to converge + for _ in 0..15 { + let rpc_peers = spy_ref.read().unwrap().rpc_peers(); + if rpc_peers.len() == num_nodes { + debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers); + gossip_service.close().unwrap(); + return rpc_peers; + } + debug!( + "converge found {} nodes, need {} more", + rpc_peers.len(), + num_nodes - rpc_peers.len() + ); + sleep(Duration::new(1, 0)); + } + panic!("Failed to converge"); +} + +pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc>, Pubkey) { + let keypair = Keypair::new(); + let exit = Arc::new(AtomicBool::new(false)); + let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey()); + let id = spy.info.id; + let daddr = "0.0.0.0:0".parse().unwrap(); + spy.info.tvu = daddr; + spy.info.rpc = daddr; + let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair)); + spy_cluster_info.insert_info(leader.clone()); + spy_cluster_info.set_leader(leader.id); + let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); + let gossip_service = GossipService::new( + &spy_cluster_info_ref, + None, + spy.sockets.gossip, + exit.clone(), + ); + + (gossip_service, spy_cluster_info_ref, id) +} + impl Service for GossipService { type JoinReturnType = (); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 286f775fb6..9690c37dec 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -24,7 +24,11 @@ use solana_sdk::vote_transaction::VoteTransaction; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, SyncSender}; use std::sync::{Arc, RwLock}; +#[cfg(test)] +use std::thread::sleep; use std::thread::{self, Builder, JoinHandle}; +#[cfg(test)] +use std::time::Duration; use std::time::Instant; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; @@ -51,6 +55,8 @@ pub struct ReplayStage { t_replay: JoinHandle<()>, exit: Arc, ledger_signal_sender: SyncSender, + #[cfg(test)] + pause: Arc, } impl ReplayStage { @@ -90,11 +96,12 @@ impl ReplayStage { duration_as_ms(&now.elapsed()) as usize ); + let num_ticks = bank.tick_height(); let mut num_ticks_to_next_vote = bank .leader_scheduler .read() .unwrap() - .num_ticks_left_in_slot(bank.tick_height()); + .num_ticks_left_in_slot(num_ticks); for (i, entry) in entries.iter().enumerate() { inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize); @@ -188,7 +195,12 @@ impl ReplayStage { ) -> (Self, EntryReceiver) { let (ledger_entry_sender, ledger_entry_receiver) = channel(); let mut entry_stream = entry_stream.cloned().map(EntryStream::new); - + #[cfg(test)] + let (pause, pause_) = { + let pause = Arc::new(AtomicBool::new(false)); + let pause_ = pause.clone(); + (pause, pause_) + }; let exit_ = exit.clone(); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) @@ -215,6 +227,10 @@ impl ReplayStage { if exit_.load(Ordering::Relaxed) { break; } + #[cfg(test)] + while pause_.load(Ordering::Relaxed) { + sleep(Duration::from_millis(200)); + } if current_slot.is_none() { let new_slot = Self::get_next_slot( @@ -309,11 +325,18 @@ impl ReplayStage { t_replay, exit, ledger_signal_sender, + #[cfg(test)] + pause, }, ledger_entry_receiver, ) } + #[cfg(test)] + pub fn get_pause(&self) -> Arc { + self.pause.clone() + } + pub fn close(self) -> thread::Result<()> { self.exit(); self.join() diff --git a/src/tvu.rs b/src/tvu.rs index 84239698b6..9d1cb42141 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -158,6 +158,11 @@ impl Tvu { ) } + #[cfg(test)] + pub fn get_pause(&self) -> Arc { + self.replay_stage.get_pause() + } + pub fn get_state(&self) -> Hash { *self.last_entry_id.read().unwrap() } diff --git a/tests/multinode.rs b/tests/multinode.rs index d64689fb10..b6825a15ba 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -3,10 +3,10 @@ use solana::blob_fetch_stage::BlobFetchStage; use solana::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger}; use solana::blocktree::{Blocktree, BlocktreeConfig, DEFAULT_SLOT_HEIGHT}; use solana::client::mk_client; -use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; +use solana::cluster_info::{Node, NodeInfo}; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{new_bank_from_ledger, Fullnode, FullnodeConfig, FullnodeReturnType}; -use solana::gossip_service::GossipService; +use solana::gossip_service::{converge, make_listening_node}; use solana::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig}; use solana::packet::SharedBlob; use solana::result; @@ -34,82 +34,6 @@ fn read_ledger(ledger_path: &str) -> Vec { .collect() } -fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc>, Pubkey) { - let keypair = Keypair::new(); - let exit = Arc::new(AtomicBool::new(false)); - let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey()); - let id = spy.info.id.clone(); - let daddr = "0.0.0.0:0".parse().unwrap(); - spy.info.tvu = daddr; - spy.info.rpc = daddr; - let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair)); - spy_cluster_info.insert_info(leader.clone()); - spy_cluster_info.set_leader(leader.id); - let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); - let gossip_service = GossipService::new( - &spy_cluster_info_ref, - None, - spy.sockets.gossip, - exit.clone(), - ); - - (gossip_service, spy_cluster_info_ref, id) -} - -fn make_listening_node( - leader: &NodeInfo, -) -> (GossipService, Arc>, Node, Pubkey) { - let keypair = Keypair::new(); - let exit = Arc::new(AtomicBool::new(false)); - let new_node = Node::new_localhost_with_pubkey(keypair.pubkey()); - let new_node_info = new_node.info.clone(); - let id = new_node.info.id.clone(); - let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair)); - new_node_cluster_info.insert_info(leader.clone()); - new_node_cluster_info.set_leader(leader.id); - let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info)); - let gossip_service = GossipService::new( - &new_node_cluster_info_ref, - None, - new_node - .sockets - .gossip - .try_clone() - .expect("Failed to clone gossip"), - exit.clone(), - ); - - (gossip_service, new_node_cluster_info_ref, new_node, id) -} - -fn converge(node: &NodeInfo, num_nodes: usize) -> Vec { - info!("Wait for convergence with {} nodes", num_nodes); - // Let's spy on the network - let (gossip_service, spy_ref, id) = make_spy_node(node); - trace!( - "converge spy_node {} looking for at least {} nodes", - id, - num_nodes - ); - - // Wait for the cluster to converge - for _ in 0..15 { - let rpc_peers = spy_ref.read().unwrap().rpc_peers(); - if rpc_peers.len() == num_nodes { - debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers); - gossip_service.close().unwrap(); - return rpc_peers; - } - debug!( - "converge found {} nodes, need {} more", - rpc_peers.len(), - num_nodes - rpc_peers.len() - ); - sleep(Duration::new(1, 0)); - } - panic!("Failed to converge"); -} - #[test] fn test_multi_node_ledger_window() -> result::Result<()> { solana_logger::setup();