diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index c96c52670a..2ab237c723 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -266,7 +266,7 @@ impl BankingStage { let poh = poh_recorder.lock().unwrap(); let leader_id = match poh.bank() { Some(bank) => leader_schedule_cache - .slot_leader_at_else_compute(bank.slot() + 1, &bank) + .slot_leader_at(bank.slot() + 1, Some(&bank)) .unwrap_or_default(), None => { if poh.would_be_leader(DEFAULT_TICKS_PER_SLOT) { diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index a68fcfc227..5feaac9e2c 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -131,7 +131,7 @@ pub fn process_blocktree( vec![(slot, meta, bank, entry_height, last_entry_hash)] }; - let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule()); + let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), 0); let mut fork_info = vec![]; let mut last_status_report = Instant::now(); @@ -188,6 +188,7 @@ pub fn process_blocktree( bank.freeze(); // all banks handled by this routine are created from complete slots if blocktree.is_root(slot) { + leader_schedule_cache.set_root(slot); bank.squash(); pending_slots.clear(); fork_info.clear(); @@ -219,7 +220,7 @@ pub fn process_blocktree( let next_bank = Arc::new(Bank::new_from_parent( &bank, &leader_schedule_cache - .slot_leader_at_else_compute(next_slot, &bank) + .slot_leader_at(next_slot, Some(&bank)) .unwrap(), next_slot, )); diff --git a/core/src/leader_schedule_cache.rs b/core/src/leader_schedule_cache.rs index 113a0c9fe0..569a0d42bf 100644 --- a/core/src/leader_schedule_cache.rs +++ b/core/src/leader_schedule_cache.rs @@ -15,41 +15,34 @@ pub struct LeaderScheduleCache { // Map from an epoch to a leader schedule for that epoch pub cached_schedules: RwLock, epoch_schedule: EpochSchedule, + max_epoch: RwLock, } impl LeaderScheduleCache { pub fn new_from_bank(bank: &Bank) -> Self { - Self::new(*bank.epoch_schedule()) + Self::new(*bank.epoch_schedule(), bank.slot()) } - pub fn new(epoch_schedule: EpochSchedule) -> Self { - Self { + pub fn new(epoch_schedule: EpochSchedule, root: u64) -> Self { + let cache = Self { cached_schedules: RwLock::new((HashMap::new(), VecDeque::new())), epoch_schedule, - } + max_epoch: RwLock::new(0), + }; + + cache.set_root(root); + cache } - pub fn slot_leader_at(&self, slot: u64) -> Option { - let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot); - self.cached_schedules - .read() - .unwrap() - .0 - .get(&epoch) - .map(|schedule| schedule[slot_index]) + pub fn set_root(&self, root: u64) { + *self.max_epoch.write().unwrap() = self.epoch_schedule.get_stakers_epoch(root); } - pub fn slot_leader_at_else_compute(&self, slot: u64, bank: &Bank) -> Option { - let cache_result = self.slot_leader_at(slot); - if cache_result.is_some() { - cache_result + pub fn slot_leader_at(&self, slot: u64, bank: Option<&Bank>) -> Option { + if let Some(bank) = bank { + self.slot_leader_at_else_compute(slot, bank) } else { - let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); - if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) { - Some(epoch_schedule[slot_index]) - } else { - None - } + self.slot_leader_at_no_compute(slot) } } @@ -94,6 +87,39 @@ impl LeaderScheduleCache { None } + fn slot_leader_at_no_compute(&self, slot: u64) -> Option { + let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot); + self.cached_schedules + .read() + .unwrap() + .0 + .get(&epoch) + .map(|schedule| schedule[slot_index]) + } + + fn slot_leader_at_else_compute(&self, slot: u64, bank: &Bank) -> Option { + let cache_result = self.slot_leader_at_no_compute(slot); + // Forbid asking for slots in an unconfirmed epoch + let bank_epoch = self.epoch_schedule.get_epoch_and_slot_index(slot).0; + if bank_epoch > *self.max_epoch.read().unwrap() { + error!( + "Requested leader in slot: {} of unconfirmed epoch: {}", + slot, bank_epoch + ); + return None; + } + if cache_result.is_some() { + cache_result + } else { + let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); + if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) { + Some(epoch_schedule[slot_index]) + } else { + None + } + } + } + fn get_epoch_schedule_else_compute( &self, epoch: u64, @@ -150,19 +176,17 @@ mod tests { use crate::blocktree::get_tmp_ledger_path; #[test] - fn test_slot_leader_at_else_compute() { + fn test_slot_leader_at() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Bank::new(&genesis_block); let cache = LeaderScheduleCache::new_from_bank(&bank); // Nothing in the cache, should return None - assert!(cache.slot_leader_at(bank.slot()).is_none()); + assert!(cache.slot_leader_at(bank.slot(), None).is_none()); // Add something to the cache - assert!(cache - .slot_leader_at_else_compute(bank.slot(), &bank) - .is_some()); - assert!(cache.slot_leader_at(bank.slot()).is_some()); + assert!(cache.slot_leader_at(bank.slot(), Some(&bank)).is_some()); + assert!(cache.slot_leader_at(bank.slot(), None).is_some()); assert_eq!(cache.cached_schedules.read().unwrap().0.len(), 1); } @@ -195,9 +219,9 @@ mod tests { fn run_thread_race() { let slots_per_epoch = MINIMUM_SLOT_LENGTH as u64; let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true); - let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule)); let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); + let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule, bank.slot())); let num_threads = 10; let (threads, senders): (Vec<_>, Vec<_>) = (0..num_threads) @@ -210,7 +234,7 @@ mod tests { .name("test_thread_race_leader_schedule_cache".to_string()) .spawn(move || { let _ = receiver.recv(); - cache.slot_leader_at_else_compute(bank.slot(), &bank); + cache.slot_leader_at(bank.slot(), Some(&bank)); }) .unwrap(), sender, @@ -246,9 +270,7 @@ mod tests { let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); assert_eq!( - cache - .slot_leader_at_else_compute(bank.slot(), &bank) - .unwrap(), + cache.slot_leader_at(bank.slot(), Some(&bank)).unwrap(), pubkey ); assert_eq!(cache.next_leader_slot(&pubkey, 0, &bank, None), Some(1)); @@ -294,9 +316,7 @@ mod tests { ); assert_eq!( - cache - .slot_leader_at_else_compute(bank.slot(), &bank) - .unwrap(), + cache.slot_leader_at(bank.slot(), Some(&bank)).unwrap(), pubkey ); // Check that the next leader slot after 0 is slot 1 @@ -400,4 +420,35 @@ mod tests { Some(expected_slot), ); } + + #[test] + fn test_schedule_for_unconfirmed_epoch() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let cache = LeaderScheduleCache::new_from_bank(&bank); + + assert_eq!(*cache.max_epoch.read().unwrap(), 1); + + // Asking for the leader for the last slot in epoch 1 is ok b/c + // epoch 1 is confirmed + assert_eq!(bank.get_epoch_and_slot_index(95).0, 1); + assert!(cache.slot_leader_at(95, Some(&bank)).is_some()); + + // Asking for the lader for the first slot in epoch 2 is not ok + // b/c epoch 2 is unconfirmed + assert_eq!(bank.get_epoch_and_slot_index(96).0, 2); + assert!(cache.slot_leader_at(96, Some(&bank)).is_none()); + + let bank2 = Bank::new_from_parent(&bank, &Pubkey::new_rand(), 95); + assert!(bank2.epoch_vote_accounts(2).is_some()); + + // Set root for a slot in epoch 1, so that epoch 2 is now confirmed + cache.set_root(95); + assert_eq!(*cache.max_epoch.read().unwrap(), 2); + assert!(cache.slot_leader_at(96, Some(&bank2)).is_some()); + assert_eq!(bank2.get_epoch_and_slot_index(223).0, 2); + assert!(cache.slot_leader_at(223, Some(&bank2)).is_some()); + assert_eq!(bank2.get_epoch_and_slot_index(224).0, 3); + assert!(cache.slot_leader_at(224, Some(&bank2)).is_none()); + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 26dbd916cd..03e44b4360 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -157,6 +157,7 @@ impl ReplayStage { &vote_account, &cluster_info, &blocktree, + &leader_schedule_cache, )?; Self::reset_poh_recorder( @@ -235,7 +236,7 @@ impl ReplayStage { }; assert!(parent.is_frozen()); - leader_schedule_cache.slot_leader_at_else_compute(poh_slot, &parent) + leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) .map(|next_leader| { debug!( "me: {} leader {} at poh slot {}", @@ -308,12 +309,14 @@ impl ReplayStage { vote_account_pubkey: &Pubkey, cluster_info: &Arc>, blocktree: &Arc, + leader_schedule_cache: &Arc, ) -> Result<()> where T: 'static + KeypairUtil + Send + Sync, { if let Some(new_root) = locktower.record_vote(bank.slot()) { bank_forks.write().unwrap().set_root(new_root); + leader_schedule_cache.set_root(new_root); blocktree.set_root(new_root)?; Self::handle_new_root(&bank_forks, progress); } @@ -593,7 +596,7 @@ impl ReplayStage { continue; } let leader = leader_schedule_cache - .slot_leader_at_else_compute(child_id, &parent_bank) + .slot_leader_at(child_id, Some(&parent_bank)) .unwrap(); info!("new fork:{} parent:{}", child_id, parent_id); forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_id)); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 3c6686ac21..345189ce8a 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -51,7 +51,7 @@ fn retransmit( ); for blob in &blobs { let leader = leader_schedule_cache - .slot_leader_at_else_compute(blob.read().unwrap().slot(), r_bank.as_ref()); + .slot_leader_at(blob.read().unwrap().slot(), Some(r_bank.as_ref())); if blob.read().unwrap().meta.forward { ClusterInfo::retransmit_to(&cluster_info, &neighbors, blob, leader, sock, true)?; ClusterInfo::retransmit_to(&cluster_info, &children, blob, leader, sock, false)?; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 15523aefb3..6ff25ccf66 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -85,10 +85,10 @@ fn should_retransmit_and_persist( my_id: &Pubkey, ) -> bool { let slot_leader_id = match bank { - None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot())), + None => leader_schedule_cache.and_then(|cache| cache.slot_leader_at(blob.slot(), None)), Some(bank) => match leader_schedule_cache { None => slot_leader_at(blob.slot(), &bank), - Some(cache) => cache.slot_leader_at_else_compute(blob.slot(), bank), + Some(cache) => cache.slot_leader_at(blob.slot(), Some(bank)), }, };