diff --git a/src/fullnode.rs b/src/fullnode.rs index 6d231d84fc..e1efcd95d2 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -28,7 +28,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; -use std::thread::{sleep, spawn, Result}; +use std::thread::{spawn, Result}; use std::time::Duration; struct NodeServices { @@ -127,19 +127,9 @@ impl Fullnode { let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new( &config.leader_scheduler_config, ))); - let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = + let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = new_banks_from_blocktree(ledger_path, &config.ledger_config(), &leader_scheduler); - if bank_forks_info.len() != 1 { - warn!("TODO: figure out what to do with multiple bank forks"); - } - bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); - let (bank, entry_height, last_entry_id) = ( - bank_forks.working_bank(), - bank_forks_info[0].entry_height, - bank_forks_info[0].last_entry_id, - ); - info!("node info: {:?}", node.info); info!("node entrypoint_info: {:?}", entrypoint_info_option); info!( @@ -206,35 +196,6 @@ impl Fullnode { .insert_info(entrypoint_info.clone()); } - // Figure which node should generate the next tick - let (scheduled_leader, max_tick_height, blob_index) = { - let next_tick = bank.tick_height() + 1; - - let leader_scheduler = leader_scheduler.read().unwrap(); - let slot_at_next_tick = leader_scheduler.tick_height_to_slot(next_tick); - - let scheduled_leader = leader_scheduler - .get_leader_for_slot(slot_at_next_tick) - .expect("Leader not known after processing bank"); - let max_tick_height = next_tick + leader_scheduler.num_ticks_left_in_slot(next_tick); - let blob_index = - if let Some(meta) = blocktree.meta(slot_at_next_tick).expect("Database error") { - meta.consumed - } else { - 0 - }; - - trace!( - "node {:?} scheduled as leader for ticks ({},{}), starting blob_index={}", - scheduled_leader, - next_tick, - max_tick_height, - blob_index, - ); - - (scheduled_leader, max_tick_height, blob_index) - }; - let sockets = Sockets { repair: node .sockets @@ -263,6 +224,48 @@ impl Fullnode { // Setup channel for rotation indications let (rotation_sender, rotation_receiver) = channel(); + // TODO: All this into Tpu/ReplayStage.... + if bank_forks_info.len() != 1 { + warn!("TODO: figure out what to do with multiple bank forks"); + } + let (bank, entry_height, last_entry_id) = { + let mut bank_forks = bank_forks.write().unwrap(); + bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); + ( + bank_forks.working_bank(), + bank_forks_info[0].entry_height, + bank_forks_info[0].last_entry_id, + ) + }; + + // Figure which node should generate the next tick + let (next_leader, next_slot, blob_index) = { + let next_tick = bank.tick_height() + 1; + + let leader_scheduler = leader_scheduler.read().unwrap(); + let next_slot = leader_scheduler.tick_height_to_slot(next_tick); + + let next_leader = leader_scheduler + .get_leader_for_slot(next_slot) + .expect("Leader not known after processing bank"); + let blob_index = if let Some(meta) = blocktree.meta(next_slot).expect("Database error") + { + meta.consumed + } else { + 0 + }; + + trace!( + "node {:?} scheduled as leader for slot {}, starting blob_index={}", + next_leader, + next_slot, + blob_index, + ); + + (next_leader, next_slot, blob_index) + }; + // END TODO + let tvu = Tvu::new( voting_keypair_option, &bank, @@ -298,67 +301,21 @@ impl Fullnode { leader_scheduler, }; - fullnode.rotate( - &bank, - scheduled_leader, - max_tick_height, - blob_index, - &last_entry_id, - ); + // TODO: This first rotate should come from the Tvu/ReplayStage + fullnode.rotate(&bank, next_leader, next_slot, blob_index, &last_entry_id); inc_new_counter_info!("fullnode-new", 1); fullnode } - fn get_next_leader(&self, bank: &Arc, tick_height: u64) -> (Pubkey, u64) { - loop { - let bank_tick_height = bank.tick_height(); - if bank_tick_height >= tick_height { - break; - } - trace!( - "{:?}: Waiting for bank tick_height to catch up from {} to {}", - self.id, - bank_tick_height, - tick_height - ); - sleep(Duration::from_millis(10)); - } - - let (scheduled_leader, max_tick_height) = { - let mut leader_scheduler = self.leader_scheduler.write().unwrap(); - - // A transition is only permitted on the final tick of a slot - assert_eq!(leader_scheduler.num_ticks_left_in_slot(tick_height), 0); - let first_tick_of_next_slot = tick_height + 1; - - leader_scheduler.update_tick_height(first_tick_of_next_slot, bank); - let slot = leader_scheduler.tick_height_to_slot(first_tick_of_next_slot); - ( - leader_scheduler.get_leader_for_slot(slot).unwrap(), - first_tick_of_next_slot - + leader_scheduler.num_ticks_left_in_slot(first_tick_of_next_slot), - ) - }; - - debug!( - "node {:?} scheduled as leader for ticks [{}, {})", - scheduled_leader, - tick_height + 1, - max_tick_height - ); - - (scheduled_leader, max_tick_height) - } - fn rotate( &mut self, bank: &Arc, - next_leader: Pubkey, - max_tick_height: u64, + leader: Pubkey, + slot: u64, blob_index: u64, last_entry_id: &Hash, ) -> FullnodeReturnType { - if next_leader == self.id { + if leader == self.id { let transition = match self.node_services.tpu.is_leader() { Some(was_leader) => { if was_leader { @@ -371,6 +328,7 @@ impl Fullnode { } None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here... }; + let tpu_bank = Arc::new(Bank::new_from_parent(bank)); self.node_services.tpu.switch_to_leader( &tpu_bank, @@ -383,7 +341,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone broadcast socket"), self.sigverify_disabled, - max_tick_height, + slot, blob_index, last_entry_id, &self.blocktree, @@ -393,7 +351,7 @@ impl Fullnode { } else { debug!("{:?} rotating to validator role", self.id); self.node_services.tpu.switch_to_forwarder( - next_leader, + leader, self.tpu_sockets .iter() .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) @@ -421,17 +379,23 @@ impl Fullnode { } match self.rotation_receiver.recv_timeout(timeout) { - Ok(tick_height) => { - trace!("{:?}: rotate at tick_height={}", self.id, tick_height); + Ok((bank_id, slot, leader)) => { + trace!( + "{:?}: rotate for slot={} to leader={:?} using bank={}", + self.id, + slot, + leader, + bank_id + ); + + // TODO: Uncomment next line once `bank_id` has a valid id in it + //self.bank_forks.write().set_working_bank_id(bank_id); let bank = self.bank_forks.read().unwrap().working_bank(); - let (next_leader, max_tick_height) = self.get_next_leader(&bank, tick_height); - let transition = - self.rotate(&bank, next_leader, max_tick_height, 0, &bank.last_id()); + + let transition = self.rotate(&bank, leader, slot, 0, &bank.last_id()); debug!("role transition complete: {:?}", transition); if let Some(ref rotation_notifier) = rotation_notifier { - rotation_notifier - .send((transition, tick_height + 1)) - .unwrap(); + rotation_notifier.send((transition, slot)).unwrap(); } } Err(RecvTimeoutError::Timeout) => continue, @@ -684,7 +648,7 @@ mod tests { // cluster it will continue to be the leader assert_eq!( rotation_receiver.recv().unwrap(), - (FullnodeReturnType::LeaderToLeaderRotation, ticks_per_slot) + (FullnodeReturnType::LeaderToLeaderRotation, 1) ); bootstrap_leader_exit(); } diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 2680765516..6a2325493e 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -297,14 +297,23 @@ impl ReplayStage { // for leader rotation if max_tick_height_for_slot == current_tick_height { // Check for leader rotation - let leader_id = leader_scheduler_ - .read() - .unwrap() - .get_leader_for_tick(current_tick_height + 1) - .unwrap(); + let (leader_id, next_slot) = { + let leader_scheduler = leader_scheduler_.read().unwrap(); + ( + leader_scheduler + .get_leader_for_tick(current_tick_height + 1) + .unwrap(), + leader_scheduler.tick_height_to_slot(current_tick_height + 1), + ) + }; if my_id == leader_id || my_id == last_leader_id { - to_leader_sender.send(current_tick_height).unwrap(); + to_leader_sender + .send(( + 0, // TODO: fix hard coded bank_id + next_slot, leader_id, + )) + .unwrap(); } else if leader_id != last_leader_id { // TODO: Remove this soon once we boot the leader from ClusterInfo cluster_info.write().unwrap().set_leader(leader_id); @@ -482,7 +491,7 @@ mod test { info!("Wait for replay_stage to exit and check return value is correct"); assert_eq!( - 2 * ticks_per_slot - 1, + (0, 2, my_keypair.pubkey()), rotation_receiver .recv() .expect("should have signaled leader rotation"), @@ -726,7 +735,7 @@ mod test { // Wait for replay_stage to exit and check return value is correct assert_eq!( - active_window_tick_length, + (0, 1, my_keypair.pubkey()), rotation_rx .recv() .expect("should have signaled leader rotation") diff --git a/src/tpu.rs b/src/tpu.rs index 0274b96fd8..539f626a98 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -196,7 +196,7 @@ impl Tpu { transactions_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, - max_tick_height: u64, + slot: u64, blob_index: u64, last_entry_id: &Hash, blocktree: &Arc, @@ -222,6 +222,10 @@ impl Tpu { let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); + // TODO: Fix BankingStage/BroadcastService to operate on `slot` directly instead of + // `max_tick_height` + let max_tick_height = (slot + 1) * leader_scheduler.read().unwrap().ticks_per_slot - 1; + let (banking_stage, entry_receiver) = BankingStage::new( &bank, verified_receiver, diff --git a/src/tvu.rs b/src/tvu.rs index 707669deab..f2523b9cbe 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -25,6 +25,7 @@ use crate::storage_stage::{StorageStage, StorageState}; use crate::voting_keypair::VotingKeypair; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; +use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -32,7 +33,11 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::thread; -pub type TvuReturnType = u64; // tick_height to initiate a rotation +pub type TvuReturnType = ( + u64, // bank_id, + u64, // slot height to initiate a rotation + Pubkey, // leader upon rotation +); pub type TvuRotationSender = Sender; pub type TvuRotationReceiver = Receiver; diff --git a/tests/multinode.rs b/tests/multinode.rs index a80b03dba9..c2a4f94342 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -989,10 +989,7 @@ fn test_leader_to_validator_transition() { let (rotation_sender, rotation_receiver) = channel(); let leader_exit = leader.run(Some(rotation_sender)); - let expected_rotations = vec![( - FullnodeReturnType::LeaderToValidatorRotation, - ticks_per_slot, - )]; + let expected_rotations = vec![(FullnodeReturnType::LeaderToValidatorRotation, 1)]; for expected_rotation in expected_rotations { loop { @@ -1127,35 +1124,23 @@ fn test_leader_validator_basic() { info!("Waiting for slot 0 -> slot 1: bootstrap leader and the validator rotate"); assert_eq!( leader_rotation_receiver.recv().unwrap(), - ( - FullnodeReturnType::LeaderToValidatorRotation, - ticks_per_slot, - ) + (FullnodeReturnType::LeaderToValidatorRotation, 1) ); assert_eq!( validator_rotation_receiver.recv().unwrap(), - ( - FullnodeReturnType::ValidatorToLeaderRotation, - ticks_per_slot, - ) + (FullnodeReturnType::ValidatorToLeaderRotation, 1) ); info!("Waiting for slot 1 -> slot 2: validator remains the slot leader due to no votes"); assert_eq!( validator_rotation_receiver.recv().unwrap(), - ( - FullnodeReturnType::LeaderToLeaderRotation, - ticks_per_slot * 2, - ) + (FullnodeReturnType::LeaderToLeaderRotation, 2) ); info!("Waiting for slot 2 -> slot 3: validator remains the slot leader due to no votes"); assert_eq!( validator_rotation_receiver.recv().unwrap(), - ( - FullnodeReturnType::LeaderToLeaderRotation, - ticks_per_slot * 3, - ) + (FullnodeReturnType::LeaderToLeaderRotation, 3) ); info!("Shut down"); @@ -1953,13 +1938,10 @@ fn test_fullnode_rotate( // Check for leader rotation { match leader_rotation_receiver.try_recv() { - Ok((rotation_type, tick_height)) => { - info!( - "leader rotation event {:?} at tick_height={}", - rotation_type, tick_height - ); + Ok((rotation_type, slot)) => { + info!("leader rotation event {:?} at slot={}", rotation_type, slot); info!("leader should be leader? {}", leader_should_be_leader); - assert_eq!(tick_height, leader_tick_height_of_next_rotation); + assert_eq!(slot, leader_tick_height_of_next_rotation / ticks_per_slot); if include_validator { assert_eq!( rotation_type, @@ -1983,13 +1965,16 @@ fn test_fullnode_rotate( // Check for validator rotation if include_validator { match validator_rotation_receiver.try_recv() { - Ok((rotation_type, tick_height)) => { + Ok((rotation_type, slot)) => { info!( - "validator rotation event {:?} at tick_height={}", - rotation_type, tick_height + "validator rotation event {:?} at slot={}", + rotation_type, slot ); info!("validator should be leader? {}", validator_should_be_leader); - assert_eq!(tick_height, validator_tick_height_of_next_rotation); + assert_eq!( + slot, + validator_tick_height_of_next_rotation / ticks_per_slot + ); assert_eq!( rotation_type, if validator_should_be_leader {