From 0ffd91df27528346a795edf9f649b782a8d784f5 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Wed, 17 Jul 2019 14:10:15 -0700 Subject: [PATCH] groom poh_recorder (#5127) * groom poh_recorder * fixup * nits * slot() from the outside means "the slot the recorder is working on" * remove redundant check * review comments, put next_tick back in the "is reset" check * remove redundant check --- core/src/poh_recorder.rs | 208 ++++++++++++++------------------------- core/src/replay_stage.rs | 17 ++-- 2 files changed, 83 insertions(+), 142 deletions(-) diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 7a6603a4a9..f589531ee3 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -13,7 +13,6 @@ use crate::blocktree::Blocktree; use crate::entry::Entry; use crate::leader_schedule_cache::LeaderScheduleCache; -use crate::leader_schedule_utils; use crate::poh::Poh; use crate::result::{Error, Result}; use solana_runtime::bank::Bank; @@ -27,7 +26,7 @@ use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::{Arc, Mutex}; use std::time::Instant; -const MAX_LAST_LEADER_GRACE_TICKS_FACTOR: u64 = 2; +const GRACE_TICKS_FACTOR: u64 = 2; #[derive(Debug, PartialEq, Eq, Clone)] pub enum PohRecorderError { @@ -49,14 +48,14 @@ pub struct PohRecorder { pub poh: Arc>, tick_height: u64, clear_bank_signal: Option>, - start_slot: Slot, - start_tick: u64, + start_slot: Slot, // parent slot + start_tick: u64, // first tick this recorder will observe tick_cache: Vec<(Entry, u64)>, working_bank: Option, sender: Sender, start_leader_at_tick: Option, last_leader_tick: u64, // zero if none - max_last_leader_grace_ticks: u64, + grace_ticks: u64, id: Pubkey, blocktree: Arc, leader_schedule_cache: Arc, @@ -76,9 +75,9 @@ impl PohRecorder { ); assert_eq!(self.ticks_per_slot, bank.ticks_per_slot()); let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks( - &next_leader_slot, + next_leader_slot, self.ticks_per_slot, - self.max_last_leader_grace_ticks, + self.grace_ticks, ); self.start_leader_at_tick = start_leader_at_tick; self.last_leader_tick = last_leader_tick; @@ -89,32 +88,21 @@ impl PohRecorder { } pub fn would_be_leader(&self, within_next_n_ticks: u64) -> bool { - let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| { - let leader_ideal_start_tick = - leader_tick.saturating_sub(self.max_last_leader_grace_ticks); + self.has_bank() + || self.start_leader_at_tick.map_or(false, |leader_tick| { + let ideal_leader_tick = leader_tick.saturating_sub(self.grace_ticks); - self.tick_height <= self.last_leader_tick - && self.tick_height >= leader_ideal_start_tick.saturating_sub(within_next_n_ticks) - }); - - self.working_bank.is_some() || close_to_leader_tick - } - - pub fn next_slot_leader(&self) -> Option { - let slot = - leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height); - self.leader_schedule_cache.slot_leader_at(slot + 1, None) + self.tick_height <= self.last_leader_tick + && self.tick_height >= ideal_leader_tick.saturating_sub(within_next_n_ticks) + }) } pub fn leader_after_slots(&self, slots: u64) -> Option { - let slot = - leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height); self.leader_schedule_cache - .slot_leader_at(slot + slots, None) + .slot_leader_at(self.tick_height / self.ticks_per_slot + slots, None) } - - pub fn start_slot(&self) -> Slot { - self.start_slot + pub fn next_slot_leader(&self) -> Option { + self.leader_after_slots(1) } pub fn bank(&self) -> Option> { @@ -135,39 +123,40 @@ impl PohRecorder { /// returns if leader tick has reached, how many grace ticks were afforded, /// imputed leader_slot and self.start_slot + /// reached_leader_tick() == true means "ready for a bank" pub fn reached_leader_tick(&self) -> (bool, u64, Slot, Slot) { - let slot = - leader_schedule_utils::tick_height_to_slot(self.ticks_per_slot, self.tick_height); - trace!( - "tick_height {}, start_tick {} start_leader_at_tick {:?}, grace {}", + "tick_height {}, start_tick {}, start_leader_at_tick {:?}, grace {}, has_bank {}", self.tick_height, self.start_tick, self.start_leader_at_tick, - self.max_last_leader_grace_ticks + self.grace_ticks, + self.has_bank() ); + let next_tick = self.tick_height + 1; + if let Some(target_tick) = self.start_leader_at_tick { - let leader_ideal_start_tick = - target_tick.saturating_sub(self.max_last_leader_grace_ticks); - // Check if either grace period has expired, - // or target tick is = grace period (i.e. poh recorder was just reset) - if self.tick_height >= target_tick - || self.max_last_leader_grace_ticks >= target_tick.saturating_sub(self.start_tick) - { + // we've reached target_tick OR poh was reset to run immediately + if next_tick >= target_tick || self.start_tick + self.grace_ticks == target_tick { + assert!(next_tick >= self.start_tick); + let ideal_target_tick = target_tick.saturating_sub(self.grace_ticks); + return ( true, - self.tick_height.saturating_sub(leader_ideal_start_tick), - slot, + next_tick.saturating_sub(ideal_target_tick), + next_tick / self.ticks_per_slot, self.start_slot, ); } } - (false, 0, slot, self.start_slot) + (false, 0, next_tick / self.ticks_per_slot, self.start_slot) } + // returns (start_leader_at_tick, last_leader_tick) given the next slot this + // recorder will lead fn compute_leader_slot_ticks( - next_leader_slot: &Option, + next_leader_slot: Option, ticks_per_slot: u64, grace_ticks: u64, ) -> (Option, u64) { @@ -182,33 +171,28 @@ impl PohRecorder { } // synchronize PoH with a bank - pub fn reset( - &mut self, - tick_height: u64, - blockhash: Hash, - start_slot: Slot, - next_leader_slot: Option, - ) { + pub fn reset(&mut self, blockhash: Hash, start_slot: Slot, next_leader_slot: Option) { self.clear_bank(); let mut cache = vec![]; { let mut poh = self.poh.lock().unwrap(); info!( - "reset poh from: {},{} to: {},{}", - poh.hash, self.tick_height, blockhash, tick_height, + "reset poh from: {},{},{} to: {},{}", + poh.hash, self.tick_height, self.start_slot, blockhash, start_slot ); poh.reset(blockhash, self.poh_config.hashes_per_tick); } std::mem::swap(&mut cache, &mut self.tick_cache); + self.start_slot = start_slot; - self.start_tick = tick_height + 1; - self.tick_height = tick_height; - self.max_last_leader_grace_ticks = self.ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR; + self.start_tick = (start_slot + 1) * self.ticks_per_slot; + self.tick_height = self.start_tick - 1; + let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks( - &next_leader_slot, + next_leader_slot, self.ticks_per_slot, - self.max_last_leader_grace_ticks, + self.grace_ticks, ); self.start_leader_at_tick = start_leader_at_tick; self.last_leader_tick = last_leader_tick; @@ -256,7 +240,7 @@ impl PohRecorder { .take_while(|x| x.1 <= working_bank.max_tick_height) .count(); let send_result = if entry_count > 0 { - debug!( + trace!( "flush_cache: bank_slot: {} tick_height: {} max: {} sending: {}", working_bank.bank.slot(), working_bank.bank.tick_height(), @@ -396,12 +380,9 @@ impl PohRecorder { poh_config.hashes_per_tick, ))); let (sender, receiver) = channel(); - let max_last_leader_grace_ticks = ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR; - let (start_leader_at_tick, last_leader_tick) = Self::compute_leader_slot_ticks( - &next_leader_slot, - ticks_per_slot, - max_last_leader_grace_ticks, - ); + let grace_ticks = ticks_per_slot / GRACE_TICKS_FACTOR; + let (start_leader_at_tick, last_leader_tick) = + Self::compute_leader_slot_ticks(next_leader_slot, ticks_per_slot, grace_ticks); ( Self { poh, @@ -414,7 +395,7 @@ impl PohRecorder { start_tick: tick_height + 1, start_leader_at_tick, last_leader_tick, - max_last_leader_grace_ticks, + grace_ticks, id: *id, blocktree: blocktree.clone(), leader_schedule_cache: leader_schedule_cache.clone(), @@ -538,7 +519,7 @@ mod tests { ); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); - poh_recorder.reset(0, Hash::default(), 0, Some(4)); + poh_recorder.reset(Hash::default(), 0, Some(4)); assert_eq!(poh_recorder.tick_cache.len(), 0); } Blocktree::destroy(&ledger_path).unwrap(); @@ -896,7 +877,7 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); let hash = poh_recorder.poh.lock().unwrap().hash; - poh_recorder.reset(poh_recorder.tick_height, hash, 0, Some(4)); + poh_recorder.reset(hash, 0, Some(4)); assert_eq!(poh_recorder.tick_cache.len(), 0); } Blocktree::destroy(&ledger_path).unwrap(); @@ -922,12 +903,7 @@ mod tests { poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); - poh_recorder.reset( - poh_recorder.tick_cache[0].1, - poh_recorder.tick_cache[0].0.hash, - 0, - Some(4), - ); + poh_recorder.reset(poh_recorder.tick_cache[0].0.hash, 0, Some(4)); assert_eq!(poh_recorder.tick_cache.len(), 0); } Blocktree::destroy(&ledger_path).unwrap(); @@ -935,6 +911,8 @@ mod tests { #[test] fn test_reset_to_new_value() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path!(); { let blocktree = @@ -955,10 +933,10 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 3); assert_eq!(poh_recorder.tick_height, 3); - poh_recorder.reset(1, hash(b"hello"), 0, Some(4)); + poh_recorder.reset(hash(b"hello"), 0, Some(4)); // parent slot 0 implies tick_height of 3 assert_eq!(poh_recorder.tick_cache.len(), 0); poh_recorder.tick(); - assert_eq!(poh_recorder.tick_height, 2); + assert_eq!(poh_recorder.tick_height, 4); } Blocktree::destroy(&ledger_path).unwrap(); } @@ -988,7 +966,7 @@ mod tests { max_tick_height: 3, }; poh_recorder.set_working_bank(working_bank); - poh_recorder.reset(1, hash(b"hello"), 0, Some(4)); + poh_recorder.reset(hash(b"hello"), 0, Some(4)); assert!(poh_recorder.working_bank.is_none()); } Blocktree::destroy(&ledger_path).unwrap(); @@ -1068,13 +1046,15 @@ mod tests { .is_err()); assert!(poh_recorder.working_bank.is_none()); // Make sure the starting slot is updated - assert_eq!(poh_recorder.start_slot(), end_slot); + assert_eq!(poh_recorder.start_slot, end_slot); } Blocktree::destroy(&ledger_path).unwrap(); } #[test] fn test_reached_leader_tick() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path!(); { let blocktree = @@ -1097,13 +1077,13 @@ mod tests { // Test that with no leader slot, we don't reach the leader tick assert_eq!(poh_recorder.reached_leader_tick().0, false); - poh_recorder.reset(poh_recorder.tick_height(), bank.last_blockhash(), 0, None); + poh_recorder.reset(bank.last_blockhash(), 0, None); // Test that with no leader slot in reset(), we don't reach the leader tick assert_eq!(poh_recorder.reached_leader_tick().0, false); // Provide a leader slot 1 slot down - poh_recorder.reset(bank.ticks_per_slot(), bank.last_blockhash(), 0, Some(2)); + poh_recorder.reset(bank.last_blockhash(), 0, Some(2)); let init_ticks = poh_recorder.tick_height(); @@ -1121,27 +1101,16 @@ mod tests { // Test that we don't reach the leader tick because of grace ticks assert_eq!(poh_recorder.reached_leader_tick().0, false); - // reset poh now. it should discard the grace ticks wait - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 1, - Some(2), - ); - // without sending more ticks, we should be leader now + // reset poh now. we should immediately be leader + poh_recorder.reset(bank.last_blockhash(), 1, Some(2)); assert_eq!(poh_recorder.reached_leader_tick().0, true); assert_eq!(poh_recorder.reached_leader_tick().1, 0); // Now test that with grace ticks we can reach leader ticks // Set the leader slot 1 slot down - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 2, - Some(3), - ); + poh_recorder.reset(bank.last_blockhash(), 1, Some(3)); - // Send one slot worth of ticks + // Send one slot worth of ticks ("skips" slot 2) for _ in 0..bank.ticks_per_slot() { poh_recorder.tick(); } @@ -1150,62 +1119,38 @@ mod tests { assert_eq!(poh_recorder.reached_leader_tick().0, false); // Send 1 less tick than the grace ticks - for _ in 0..bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR - 1 { + for _ in 0..bank.ticks_per_slot() / GRACE_TICKS_FACTOR { poh_recorder.tick(); } - // We are still not the leader - assert_eq!(poh_recorder.reached_leader_tick().0, false); - - // Send one more tick - poh_recorder.tick(); // We should be the leader now assert_eq!(poh_recorder.reached_leader_tick().0, true); assert_eq!( poh_recorder.reached_leader_tick().1, - bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR + bank.ticks_per_slot() / GRACE_TICKS_FACTOR ); // Let's test that correct grace ticks are reported // Set the leader slot 1 slot down - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 3, - Some(4), - ); + poh_recorder.reset(bank.last_blockhash(), 2, Some(4)); - // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) - for _ in - bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR..bank.ticks_per_slot() - { + // send ticks for a slot + for _ in 0..bank.ticks_per_slot() { poh_recorder.tick(); } - // Send one extra tick before resetting (so that there's one grace tick) - poh_recorder.tick(); - // We are not the leader yet, as expected assert_eq!(poh_recorder.reached_leader_tick().0, false); - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 3, - Some(4), - ); + poh_recorder.reset(bank.last_blockhash(), 3, Some(4)); + // without sending more ticks, we should be leader now assert_eq!(poh_recorder.reached_leader_tick().0, true); - assert_eq!(poh_recorder.reached_leader_tick().1, 1); + assert_eq!(poh_recorder.reached_leader_tick().1, 0); // Let's test that if a node overshoots the ticks for its target // leader slot, reached_leader_tick() will return true, because it's overdue // Set the leader slot 1 slot down - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 4, - Some(5), - ); + poh_recorder.reset(bank.last_blockhash(), 4, Some(5)); // Send remaining ticks for the slot (remember we sent extra ticks in the previous part of the test) for _ in 0..4 * bank.ticks_per_slot() { @@ -1245,7 +1190,7 @@ mod tests { false ); - poh_recorder.reset(poh_recorder.tick_height(), bank.last_blockhash(), 0, None); + poh_recorder.reset(bank.last_blockhash(), 0, None); assert_eq!( poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), @@ -1253,12 +1198,7 @@ mod tests { ); // We reset with leader slot after 3 slots - poh_recorder.reset( - poh_recorder.tick_height(), - bank.last_blockhash(), - 0, - Some(bank.slot() + 3), - ); + poh_recorder.reset(bank.last_blockhash(), 0, Some(bank.slot() + 3)); // Test that the node won't be leader in next 2 slots assert_eq!( diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 87fdb0565a..56806f3f8f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -398,12 +398,10 @@ impl ReplayStage { ) { let next_leader_slot = leader_schedule_cache.next_leader_slot(&my_pubkey, bank.slot(), &bank, Some(blocktree)); - poh_recorder.lock().unwrap().reset( - bank.tick_height(), - bank.last_blockhash(), - bank.slot(), - next_leader_slot, - ); + poh_recorder + .lock() + .unwrap() + .reset(bank.last_blockhash(), bank.slot(), next_leader_slot); debug!( "{:?} voted and reset poh at {}. next leader slot {:?}", my_pubkey, @@ -642,12 +640,15 @@ impl ReplayStage { // Find the next slot that chains to the old slot let frozen_banks = forks.frozen_banks(); let frozen_bank_slots: Vec = frozen_banks.keys().cloned().collect(); - trace!("frozen_banks {:?}", frozen_bank_slots); let next_slots = blocktree .get_slots_since(&frozen_bank_slots) .expect("Db error"); // Filter out what we've already seen - trace!("generate new forks {:?}", next_slots); + trace!("generate new forks {:?}", { + let mut next_slots = next_slots.iter().collect::>(); + next_slots.sort(); + next_slots + }); for (parent_id, children) in next_slots { let parent_bank = frozen_banks .get(&parent_id)