diff --git a/src/fullnode.rs b/src/fullnode.rs index 46143c3480..7f446d8ccc 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -13,13 +13,11 @@ use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::StorageState; use crate::tpu::Tpu; -use crate::tvu::{Sockets, Tvu, TvuRotationReceiver}; +use crate::tvu::{Sockets, Tvu, TvuRotationInfo, TvuRotationReceiver}; use crate::voting_keypair::VotingKeypair; use log::Level; use solana_metrics::counter::Counter; -use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; -use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; @@ -100,7 +98,6 @@ pub struct Fullnode { rpc_service: Option, rpc_pubsub_service: Option, gossip_service: GossipService, - bank_forks: Arc>, sigverify_disabled: bool, tpu_sockets: Vec, broadcast_socket: UdpSocket, @@ -224,46 +221,10 @@ impl Fullnode { // Setup channel for rotation indications let (rotation_sender, rotation_receiver) = channel(); - // TODO: All this into Tpu/ReplayStage.... - if bank_forks_info.len() != 1 { - warn!("TODO: figure out what to do with multiple bank forks"); - } - let (bank, entry_height, last_entry_id) = { - let mut bank_forks = bank_forks.write().unwrap(); - bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); - ( - bank_forks.working_bank(), - bank_forks_info[0].entry_height, - bank_forks_info[0].last_entry_id, - ) - }; - - // Figure which node should generate the next tick - let (next_leader, next_slot) = { - let next_tick = bank.tick_height() + 1; - - let leader_scheduler = leader_scheduler.read().unwrap(); - let next_slot = leader_scheduler.tick_height_to_slot(next_tick); - - let next_leader = leader_scheduler - .get_leader_for_slot(next_slot) - .expect("Leader not known after processing bank"); - - trace!( - "node {:?} scheduled as leader for slot {}", - next_leader, - next_slot, - ); - - (next_leader, next_slot) - }; - // END TODO - let tvu = Tvu::new( voting_keypair_option, &bank_forks, - entry_height, - last_entry_id, + &bank_forks_info, &cluster_info, sockets, blocktree.clone(), @@ -277,9 +238,9 @@ impl Fullnode { ); let tpu = Tpu::new(id, &cluster_info); - let mut fullnode = Self { + inc_new_counter_info!("fullnode-new", 1); + Self { id, - bank_forks, sigverify_disabled: config.sigverify_disabled, gossip_service, rpc_service: Some(rpc_service), @@ -291,22 +252,19 @@ impl Fullnode { rotation_receiver, blocktree, leader_scheduler, - }; - - // TODO: This first rotate should come from the Tvu/ReplayStage - fullnode.rotate(&bank, next_leader, next_slot, &last_entry_id); - inc_new_counter_info!("fullnode-new", 1); - fullnode + } } - fn rotate( - &mut self, - bank: &Arc, - leader: Pubkey, - slot: u64, - last_entry_id: &Hash, - ) -> FullnodeReturnType { - if leader == self.id { + fn rotate(&mut self, rotation_info: TvuRotationInfo) -> FullnodeReturnType { + trace!( + "{:?}: rotate for slot={} to leader={:?} using last_entry_id={:?}", + self.id, + rotation_info.slot, + rotation_info.leader_id, + rotation_info.last_entry_id, + ); + + if rotation_info.leader_id == self.id { let transition = match self.node_services.tpu.is_leader() { Some(was_leader) => { if was_leader { @@ -319,9 +277,8 @@ impl Fullnode { } None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here... }; - let tpu_bank = Arc::new(Bank::new_from_parent(bank, &leader)); self.node_services.tpu.switch_to_leader( - &tpu_bank, + Arc::new(rotation_info.bank), PohServiceConfig::default(), self.tpu_sockets .iter() @@ -331,8 +288,8 @@ impl Fullnode { .try_clone() .expect("Failed to clone broadcast socket"), self.sigverify_disabled, - slot, - last_entry_id, + rotation_info.slot, + rotation_info.last_entry_id, &self.blocktree, &self.leader_scheduler, ); @@ -340,7 +297,7 @@ impl Fullnode { } else { debug!("{:?} rotating to validator role", self.id); self.node_services.tpu.switch_to_forwarder( - leader, + rotation_info.leader_id, self.tpu_sockets .iter() .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) @@ -368,20 +325,9 @@ impl Fullnode { } match self.rotation_receiver.recv_timeout(timeout) { - Ok((bank_id, slot, leader)) => { - trace!( - "{:?}: rotate for slot={} to leader={:?} using bank={}", - self.id, - slot, - leader, - bank_id - ); - - // TODO: Uncomment next line once `bank_id` has a valid id in it - //self.bank_forks.write().set_working_bank_id(bank_id); - let bank = self.bank_forks.read().unwrap().working_bank(); - - let transition = self.rotate(&bank, leader, slot, &bank.last_id()); + Ok(rotation_info) => { + let slot = rotation_info.slot; + let transition = self.rotate(rotation_info); debug!("role transition complete: {:?}", transition); if let Some(ref rotation_notifier) = rotation_notifier { rotation_notifier.send((transition, slot)).unwrap(); @@ -417,7 +363,7 @@ impl Fullnode { } #[allow(clippy::trivially_copy_pass_by_ref)] -fn new_banks_from_blocktree( +pub fn new_banks_from_blocktree( blocktree_path: &str, blocktree_config: &BlocktreeConfig, leader_scheduler: &Arc>, @@ -440,34 +386,6 @@ fn new_banks_from_blocktree( ) } -// TODO: Remove this function from tests -#[allow(clippy::trivially_copy_pass_by_ref)] -pub fn new_bank_from_ledger( - ledger_path: &str, - ledger_config: &BlocktreeConfig, - leader_scheduler: &Arc>, -) -> (Arc, u64, Hash, Blocktree, Receiver) { - let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = - new_banks_from_blocktree(ledger_path, ledger_config, leader_scheduler); - - // This helper won't handle multiple banks - assert_eq!(bank_forks_info.len(), 1); - bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); - let (working_bank, entry_height, last_entry_id) = ( - bank_forks.working_bank(), - bank_forks_info[0].entry_height, - bank_forks_info[0].last_entry_id, - ); - - ( - working_bank, - entry_height, - last_entry_id, - blocktree, - ledger_signal_receiver, - ) -} - impl Service for Fullnode { type JoinReturnType = (); @@ -494,6 +412,7 @@ mod tests { use crate::entry::make_consecutive_blobs; use crate::leader_scheduler::make_active_set_entries; use crate::streamer::responder; + use solana_sdk::hash::Hash; use std::fs::remove_dir_all; #[test] @@ -634,6 +553,10 @@ mod tests { // Wait for the bootstrap leader to transition. Since there are no other nodes in the // cluster it will continue to be the leader + assert_eq!( + rotation_receiver.recv().unwrap(), + (FullnodeReturnType::LeaderToLeaderRotation, 0) + ); assert_eq!( rotation_receiver.recv().unwrap(), (FullnodeReturnType::LeaderToLeaderRotation, 1) @@ -689,8 +612,12 @@ mod tests { Some(&bootstrap_leader_info), &fullnode_config, ); - - assert!(!bootstrap_leader.node_services.tpu.is_leader().unwrap()); + let (rotation_sender, rotation_receiver) = channel(); + let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender)); + assert_eq!( + rotation_receiver.recv().unwrap(), + (FullnodeReturnType::LeaderToValidatorRotation, 2) + ); // Test that a node knows to transition to a leader based on parsing the ledger let validator = Fullnode::new( @@ -702,11 +629,15 @@ mod tests { &fullnode_config, ); - assert!(validator.node_services.tpu.is_leader().unwrap()); - validator.close().expect("Expected leader node to close"); - bootstrap_leader - .close() - .expect("Expected validator node to close"); + let (rotation_sender, rotation_receiver) = channel(); + let validator_exit = validator.run(Some(rotation_sender)); + assert_eq!( + rotation_receiver.recv().unwrap(), + (FullnodeReturnType::LeaderToLeaderRotation, 2) + ); + + validator_exit(); + bootstrap_leader_exit(); } for path in ledger_paths { Blocktree::destroy(&path).expect("Expected successful database destruction"); @@ -792,11 +723,13 @@ mod tests { // Close the validator so that rocksdb has locks available validator_exit(); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); - let (bank, entry_height, _, _, _) = new_bank_from_ledger( + let (bank_forks, bank_forks_info, _, _) = new_banks_from_blocktree( &validator_ledger_path, &BlocktreeConfig::default(), &leader_scheduler, ); + let bank = bank_forks.working_bank(); + let entry_height = bank_forks_info[0].entry_height; assert!(bank.tick_height() >= leader_scheduler.read().unwrap().ticks_per_epoch); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 92aa36160c..70c609164f 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -1,7 +1,8 @@ //! The `replay_stage` replays transactions broadcast by the leader. +use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; -use crate::blocktree_processor; +use crate::blocktree_processor::{self, BankForksInfo}; use crate::cluster_info::ClusterInfo; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_scheduler::LeaderScheduler; @@ -9,7 +10,7 @@ use crate::packet::BlobError; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; -use crate::tvu::TvuRotationSender; +use crate::tvu::{TvuRotationInfo, TvuRotationSender}; use crate::voting_keypair::VotingKeypair; use log::Level; use solana_metrics::counter::Counter; @@ -172,10 +173,10 @@ impl ReplayStage { my_id: Pubkey, voting_keypair: Option>, blocktree: Arc, - bank: Arc, + bank_forks: &Arc>, + bank_forks_info: &[BankForksInfo], cluster_info: Arc>, exit: Arc, - last_entry_id: Hash, to_leader_sender: &TvuRotationSender, ledger_signal_receiver: Receiver, leader_scheduler: &Arc>, @@ -185,9 +186,42 @@ impl ReplayStage { let exit_ = exit.clone(); let leader_scheduler_ = leader_scheduler.clone(); let to_leader_sender = to_leader_sender.clone(); - let last_entry_id = Arc::new(RwLock::new(last_entry_id)); let subscriptions_ = subscriptions.clone(); + let (bank, last_entry_id) = { + let mut bank_forks = bank_forks.write().unwrap(); + bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); + (bank_forks.working_bank(), bank_forks_info[0].last_entry_id) + }; + let last_entry_id = Arc::new(RwLock::new(last_entry_id)); + + let mut current_blob_index = { + let leader_scheduler = leader_scheduler.read().unwrap(); + let slot = leader_scheduler.tick_height_to_slot(bank.tick_height() + 1); + + let leader_id = leader_scheduler + .get_leader_for_slot(slot) + .expect("Leader not known after processing bank"); + trace!("node {:?} scheduled as leader for slot {}", leader_id, slot,); + + // Send a rotation notification back to Fullnode to initialize the TPU to the right + // state + to_leader_sender + .send(TvuRotationInfo { + bank: Bank::new_from_parent(&bank, &leader_id), + last_entry_id: *last_entry_id.read().unwrap(), + slot, + leader_id, + }) + .unwrap(); + + blocktree + .meta(slot) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0) + }; + let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { @@ -209,11 +243,6 @@ impl ReplayStage { + leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot), ) }; - let mut current_blob_index = blocktree - .meta(current_slot.unwrap()) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0); // Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each // relevant slot to see if there are any available updates @@ -298,10 +327,12 @@ impl ReplayStage { if my_id == leader_id || my_id == last_leader_id { to_leader_sender - .send(( - 0, // TODO: fix hard coded bank_id - next_slot, leader_id, - )) + .send(TvuRotationInfo { + bank: Bank::new_from_parent(&bank, &leader_id), + last_entry_id: *last_entry_id.read().unwrap(), + slot: next_slot, + leader_id, + }) .unwrap(); } else if leader_id != last_leader_id { // TODO: Remove this soon once we boot the leader from ClusterInfo @@ -355,7 +386,7 @@ mod test { use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::create_ticks; use crate::entry::{next_entry_mut, Entry}; - use crate::fullnode::new_bank_from_ledger; + use crate::fullnode::new_banks_from_blocktree; use crate::leader_scheduler::{ make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, }; @@ -438,24 +469,23 @@ mod test { { // Set up the bank let blocktree_config = BlocktreeConfig::new(ticks_per_slot); - let (bank, _entry_height, last_entry_id, blocktree, l_receiver) = - new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler); + let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = + new_banks_from_blocktree(&my_ledger_path, &blocktree_config, &leader_scheduler); // Set up the replay stage let (rotation_sender, rotation_receiver) = channel(); - let meta = blocktree.meta(0).unwrap().unwrap(); let exit = Arc::new(AtomicBool::new(false)); let blocktree = Arc::new(blocktree); let (replay_stage, ledger_writer_recv) = ReplayStage::new( my_id, Some(Arc::new(voting_keypair)), blocktree.clone(), - bank.clone(), + &Arc::new(RwLock::new(bank_forks)), + &bank_forks_info, Arc::new(RwLock::new(cluster_info_me)), exit.clone(), - last_entry_id, &rotation_sender, - l_receiver, + ledger_signal_receiver, &leader_scheduler, &Arc::new(RpcSubscriptions::default()), ); @@ -468,6 +498,7 @@ mod test { } // Write the entries to the ledger, replay_stage should get notified of changes + let meta = blocktree.meta(0).unwrap().unwrap(); blocktree .write_entries( DEFAULT_SLOT_HEIGHT, @@ -478,12 +509,15 @@ mod test { .unwrap(); info!("Wait for replay_stage to exit and check return value is correct"); + let rotation_info = rotation_receiver + .recv() + .expect("should have signaled leader rotation"); assert_eq!( - (0, 2, my_keypair.pubkey()), - rotation_receiver - .recv() - .expect("should have signaled leader rotation"), + rotation_info.last_entry_id, + bank_forks_info[0].last_entry_id ); + assert_eq!(rotation_info.slot, 2); + assert_eq!(rotation_info.leader_id, my_keypair.pubkey()); info!("Check that the entries on the ledger writer channel are correct"); let mut received_ticks = ledger_writer_recv @@ -539,24 +573,27 @@ mod test { let exit = Arc::new(AtomicBool::new(false)); let my_keypair = Arc::new(my_keypair); let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair)); - let (to_leader_sender, _) = channel(); + let (to_leader_sender, _to_leader_receiver) = channel(); { let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); - let (bank, entry_height, last_entry_id, blocktree, l_receiver) = new_bank_from_ledger( + let (bank_forks, bank_forks_info, blocktree, l_receiver) = new_banks_from_blocktree( &my_ledger_path, &BlocktreeConfig::default(), &leader_scheduler, ); + let bank = bank_forks.working_bank(); + let entry_height = bank_forks_info[0].entry_height; + let last_entry_id = bank_forks_info[0].last_entry_id; let blocktree = Arc::new(blocktree); let (replay_stage, ledger_writer_recv) = ReplayStage::new( my_keypair.pubkey(), Some(voting_keypair.clone()), blocktree.clone(), - bank.clone(), + &Arc::new(RwLock::new(bank_forks)), + &bank_forks_info, cluster_info_me.clone(), exit.clone(), - last_entry_id, &to_leader_sender, l_receiver, &leader_scheduler, @@ -661,12 +698,12 @@ mod test { let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); // Set up the replay stage - let (rotation_tx, rotation_rx) = channel(); + let (rotation_sender, rotation_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); { - let (bank, _entry_height, last_entry_id, blocktree, l_receiver) = - new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler); - + let (bank_forks, bank_forks_info, blocktree, l_receiver) = + new_banks_from_blocktree(&my_ledger_path, &blocktree_config, &leader_scheduler); + let bank = bank_forks.working_bank(); let meta = blocktree .meta(0) .unwrap() @@ -678,11 +715,11 @@ mod test { my_keypair.pubkey(), Some(voting_keypair.clone()), blocktree.clone(), - bank.clone(), + &Arc::new(RwLock::new(bank_forks)), + &bank_forks_info, cluster_info_me.clone(), exit.clone(), - last_entry_id, - &rotation_tx, + &rotation_sender, l_receiver, &leader_scheduler, &Arc::new(RpcSubscriptions::default()), @@ -720,12 +757,15 @@ mod test { } // Wait for replay_stage to exit and check return value is correct + let rotation_info = rotation_receiver + .recv() + .expect("should have signaled leader rotation"); assert_eq!( - (0, 1, my_keypair.pubkey()), - rotation_rx - .recv() - .expect("should have signaled leader rotation") + rotation_info.last_entry_id, + bank_forks_info[0].last_entry_id ); + assert_eq!(rotation_info.slot, 1); + assert_eq!(rotation_info.leader_id, my_keypair.pubkey()); assert_ne!(expected_last_id, Hash::default()); //replay stage should continue running even after rotation has happened (tvu never goes down) diff --git a/src/tpu.rs b/src/tpu.rs index 8c9ae9efd8..b9f2376cf5 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -191,13 +191,13 @@ impl Tpu { #[allow(clippy::too_many_arguments)] pub fn switch_to_leader( &mut self, - bank: &Arc, + bank: Arc, tick_duration: PohServiceConfig, transactions_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, slot: u64, - last_entry_id: &Hash, + last_entry_id: Hash, blocktree: &Arc, leader_scheduler: &Arc>, ) { @@ -234,13 +234,13 @@ impl Tpu { &bank, verified_receiver, tick_duration, - last_entry_id, + &last_entry_id, max_tick_height, self.id, ); let broadcast_service = BroadcastService::new( - bank.clone(), + bank, broadcast_socket, self.cluster_info.clone(), blob_index, diff --git a/src/tvu.rs b/src/tvu.rs index 574d6be54f..c383ac279d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -16,6 +16,7 @@ use crate::bank_forks::BankForks; use crate::blob_fetch_stage::BlobFetchStage; use crate::blockstream_service::BlockstreamService; use crate::blocktree::Blocktree; +use crate::blocktree_processor::BankForksInfo; use crate::cluster_info::ClusterInfo; use crate::leader_scheduler::LeaderScheduler; use crate::replay_stage::ReplayStage; @@ -24,6 +25,7 @@ use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; use crate::voting_keypair::VotingKeypair; +use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -33,13 +35,14 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::thread; -pub type TvuReturnType = ( - u64, // bank_id, - u64, // slot height to initiate a rotation - Pubkey, // leader upon rotation -); -pub type TvuRotationSender = Sender; -pub type TvuRotationReceiver = Receiver; +pub struct TvuRotationInfo { + pub bank: Bank, // Bank to use + pub last_entry_id: Hash, // last_entry_id of that bank + pub slot: u64, // slot height to initiate a rotation + pub leader_id: Pubkey, // leader upon rotation +} +pub type TvuRotationSender = Sender; +pub type TvuRotationReceiver = Receiver; pub struct Tvu { fetch_stage: BlobFetchStage, @@ -60,18 +63,14 @@ impl Tvu { /// This service receives messages from a leader in the network and processes the transactions /// on the bank state. /// # Arguments - /// * `bank` - The bank state. - /// * `entry_height` - Initial ledger height - /// * `last_entry_id` - Hash of the last entry /// * `cluster_info` - The cluster_info state. - /// * `sockets` - My fetch, repair, and restransmit sockets + /// * `sockets` - fetch, repair, and retransmit sockets /// * `blocktree` - the ledger itself #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub fn new( voting_keypair: Option>, bank_forks: &Arc>, - entry_height: u64, - last_entry_id: Hash, + bank_forks_info: &[BankForksInfo], cluster_info: &Arc>, sockets: Sockets, blocktree: Arc, @@ -119,15 +118,14 @@ impl Tvu { exit.clone(), ); - let bank = bank_forks.read().unwrap().working_bank(); let (replay_stage, mut previous_receiver) = ReplayStage::new( keypair.pubkey(), voting_keypair, blocktree.clone(), - bank.clone(), + &bank_forks, + &bank_forks_info, cluster_info.clone(), exit.clone(), - last_entry_id, to_leader_sender, ledger_signal_receiver, &leader_scheduler, @@ -138,7 +136,7 @@ impl Tvu { let (blockstream_service, blockstream_receiver) = BlockstreamService::new( previous_receiver, blockstream.unwrap().to_string(), - bank.tick_height(), + bank_forks.read().unwrap().working_bank().tick_height(), // TODO: BlockstreamService needs to deal with BankForks somehow still leader_scheduler, exit.clone(), ); @@ -154,7 +152,7 @@ impl Tvu { Some(blocktree), &keypair, &exit.clone(), - entry_height, + bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still storage_rotate_count, &cluster_info, ); @@ -210,6 +208,7 @@ pub mod tests { use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; + use solana_sdk::hash::Hash; #[test] fn test_tvu_exit() { @@ -222,6 +221,11 @@ pub mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(starting_balance); let bank_forks = BankForks::new(0, Bank::new(&genesis_block)); + let bank_forks_info = vec![BankForksInfo { + bank_id: 0, + entry_height: 0, + last_entry_id: Hash::default(), + }]; let leader_scheduler_config = LeaderSchedulerConfig::default(); let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank_forks.working_bank()); @@ -233,7 +237,6 @@ pub mod tests { cluster_info1.set_leader(leader.info.id); let cref1 = Arc::new(RwLock::new(cluster_info1)); - let cur_hash = Hash::default(); let blocktree_path = get_tmp_ledger_path("test_tvu_exit"); let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) .expect("Expected to successfully open ledger"); @@ -243,8 +246,7 @@ pub mod tests { let tvu = Tvu::new( Some(Arc::new(voting_keypair)), &Arc::new(RwLock::new(bank_forks)), - 0, - cur_hash, + &bank_forks_info, &cref1, { Sockets { diff --git a/tests/multinode.rs b/tests/multinode.rs index a9781b7d82..9c0ea02ca8 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,7 +6,7 @@ use solana::blocktree::{ use solana::client::mk_client; use solana::cluster_info::{Node, NodeInfo}; use solana::entry::{reconstruct_entries_from_blobs, Entry}; -use solana::fullnode::{new_bank_from_ledger, Fullnode, FullnodeConfig, FullnodeReturnType}; +use solana::fullnode::{new_banks_from_blocktree, Fullnode, FullnodeConfig, FullnodeReturnType}; use solana::gossip_service::{converge, make_listening_node}; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use solana::result; @@ -507,8 +507,11 @@ fn test_boot_validator_from_file() -> result::Result<()> { ); ledger_paths.push(genesis_ledger_path.clone()); - let leader_ledger_path = - tmp_copy_ledger(&genesis_ledger_path, "multi_node_basic", &blocktree_config); + let leader_ledger_path = tmp_copy_ledger( + &genesis_ledger_path, + "boot_validator_from_file", + &blocktree_config, + ); ledger_paths.push(leader_ledger_path.clone()); let leader_data = leader.info.clone(); @@ -521,12 +524,16 @@ fn test_boot_validator_from_file() -> result::Result<()> { None, &fullnode_config, ); + let leader_fullnode_exit = leader_fullnode.run(None); + + info!("Sending transaction to leader"); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); assert_eq!(leader_balance, 500); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); + info!("Leader balance verified"); let keypair = Arc::new(Keypair::new()); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); @@ -546,12 +553,16 @@ fn test_boot_validator_from_file() -> result::Result<()> { Some(&leader_data), &fullnode_config, ); + let val_fullnode_exit = val_fullnode.run(None); + + info!("Checking validator balance"); let mut client = mk_client(&validator_data); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); assert!(getbal == Some(leader_balance)); + info!("Validator balance verified"); - val_fullnode.close()?; - leader_fullnode.close()?; + val_fullnode_exit(); + leader_fullnode_exit(); for path in ledger_paths { remove_dir_all(path)?; @@ -604,6 +615,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let voting_keypair = VotingKeypair::new_local(&leader_keypair); let (leader_data, leader_fullnode) = create_leader(&ledger_path, leader_keypair.clone(), voting_keypair); + let leader_fullnode_exit = leader_fullnode.run(None); // lengthen the ledger let leader_balance = @@ -612,7 +624,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { assert_eq!(leader_balance, 500); // restart the leader - leader_fullnode.close()?; + leader_fullnode_exit(); } // create a "stale" ledger by copying current ledger @@ -626,6 +638,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let voting_keypair = VotingKeypair::new_local(&leader_keypair); let (leader_data, leader_fullnode) = create_leader(&ledger_path, leader_keypair.clone(), voting_keypair); + let leader_fullnode_exit = leader_fullnode.run(None); // lengthen the ledger let leader_balance = @@ -634,12 +647,13 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { assert_eq!(leader_balance, 1000); // restart the leader - leader_fullnode.close()?; + leader_fullnode_exit(); } let voting_keypair = VotingKeypair::new_local(&leader_keypair); let (leader_data, leader_fullnode) = create_leader(&ledger_path, leader_keypair, voting_keypair); + let leader_fullnode_exit = leader_fullnode.run(None); // start validator from old ledger let keypair = Arc::new(Keypair::new()); @@ -655,6 +669,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { Some(&leader_data), &fullnode_config, ); + let val_fullnode_exit = val_fullnode.run(None); // trigger broadcast, validator should catch up from leader, whose window contains // the entries missing from the stale ledger @@ -677,8 +692,8 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); assert_eq!(getbal, Some(expected)); - val_fullnode.close()?; - leader_fullnode.close()?; + val_fullnode_exit(); + leader_fullnode_exit(); remove_dir_all(ledger_path)?; remove_dir_all(stale_ledger_path)?; @@ -989,7 +1004,10 @@ fn test_leader_to_validator_transition() { let (rotation_sender, rotation_receiver) = channel(); let leader_exit = leader.run(Some(rotation_sender)); - let expected_rotations = vec![(FullnodeReturnType::LeaderToValidatorRotation, 1)]; + let expected_rotations = vec![ + (FullnodeReturnType::LeaderToLeaderRotation, 0), + (FullnodeReturnType::LeaderToValidatorRotation, 1), + ]; for expected_rotation in expected_rotations { loop { @@ -1004,12 +1022,13 @@ fn test_leader_to_validator_transition() { leader_exit(); info!("Check the ledger to make sure it's the right height..."); - let bank = new_bank_from_ledger( + let bank_forks = new_banks_from_blocktree( &leader_ledger_path, &BlocktreeConfig::default(), &Arc::new(RwLock::new(LeaderScheduler::default())), ) .0; + let bank = bank_forks.working_bank(); assert_eq!( bank.tick_height(), @@ -1122,10 +1141,18 @@ fn test_leader_validator_basic() { converge(&leader_info, 2); info!("Waiting for slot 0 -> slot 1: bootstrap leader and the validator rotate"); + assert_eq!( + leader_rotation_receiver.recv().unwrap(), + (FullnodeReturnType::LeaderToLeaderRotation, 0), + ); assert_eq!( leader_rotation_receiver.recv().unwrap(), (FullnodeReturnType::LeaderToValidatorRotation, 1) ); + assert_eq!( + validator_rotation_receiver.recv().unwrap(), + (FullnodeReturnType::LeaderToValidatorRotation, 0) + ); assert_eq!( validator_rotation_receiver.recv().unwrap(), (FullnodeReturnType::ValidatorToLeaderRotation, 1) @@ -1501,6 +1528,7 @@ fn test_full_leader_validator_network() { for node in nodes { node.1(); } + info!("Bootstrap leader exit"); bootstrap_leader_exit(); let mut node_entries = vec![]; @@ -1654,7 +1682,7 @@ fn test_broadcast_last_tick() { loop { let transition = bootstrap_leader_rotation_receiver.recv().unwrap(); info!("bootstrap leader transition event: {:?}", transition); - if transition.0 == FullnodeReturnType::LeaderToLeaderRotation { + if (FullnodeReturnType::LeaderToLeaderRotation, 1) == transition { break; } } @@ -1846,7 +1874,7 @@ fn test_fullnode_rotate( last_entry_id = entries.last().unwrap().id; } - let mut leader_tick_height_of_next_rotation = ticks_per_slot; + let mut leader_tick_height_of_next_rotation = 0; let mut leader_should_be_leader = true; if fullnode_config.leader_scheduler_config.ticks_per_slot == 1 { // Add another tick to the ledger if the cluster has been configured for 1 tick_per_slot. @@ -1857,7 +1885,7 @@ fn test_fullnode_rotate( entries.extend(tick); last_entry_id = entries.last().unwrap().id; - leader_tick_height_of_next_rotation += 2; + leader_tick_height_of_next_rotation = 2; if include_validator { leader_should_be_leader = false; } diff --git a/tests/tvu.rs b/tests/tvu.rs index a20376e64e..6fd638b59a 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -1,7 +1,7 @@ use log::trace; use solana::bank_forks::BankForks; -use solana::blocktree::Blocktree; -use solana::blocktree::{get_tmp_ledger_path, BlocktreeConfig}; +use solana::blocktree::{get_tmp_ledger_path, Blocktree, BlocktreeConfig}; +use solana::blocktree_processor::BankForksInfo; use solana::cluster_info::{ClusterInfo, Node}; use solana::entry::next_entry_mut; use solana::entry::EntrySlice; @@ -86,7 +86,14 @@ fn test_replay() { let (genesis_block, mint_keypair) = GenesisBlock::new(starting_balance); let tvu_addr = target1.info.tvu; + let mut cur_hash = Hash::default(); let bank_forks = BankForks::new(0, Bank::new(&genesis_block)); + let bank_forks_info = vec![BankForksInfo { + bank_id: 0, + entry_height: 0, + last_entry_id: cur_hash, + }]; + let bank = bank_forks.working_bank(); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new_with_bank( &leader_scheduler_config, @@ -101,20 +108,18 @@ fn test_replay() { let cref1 = Arc::new(RwLock::new(cluster_info1)); let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, exit.clone()); - let mut cur_hash = Hash::default(); let blocktree_path = get_tmp_ledger_path("test_replay"); - let (blocktree, l_receiver) = + let (blocktree, ledger_signal_receiver) = Blocktree::open_with_config_signal(&blocktree_path, &blocktree_config) .expect("Expected to successfully open ledger"); let vote_account_keypair = Arc::new(Keypair::new()); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); - let (sender, _) = channel(); + let (to_leader_sender, _to_leader_receiver) = channel(); let tvu = Tvu::new( Some(Arc::new(voting_keypair)), &Arc::new(RwLock::new(bank_forks)), - 0, - cur_hash, + &bank_forks_info, &cref1, { Sockets { @@ -125,10 +130,10 @@ fn test_replay() { }, Arc::new(blocktree), STORAGE_ROTATE_TEST_COUNT, - &sender, + &to_leader_sender, &StorageState::default(), None, - l_receiver, + ledger_signal_receiver, leader_scheduler, &Arc::new(RpcSubscriptions::default()), );