diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index c833f5421b..cae3eb8322 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -437,8 +437,13 @@ pub fn create_test_recorder( Receiver, ) { let exit = Arc::new(AtomicBool::new(false)); - let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some(4), + bank.ticks_per_slot(), + ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); (exit, poh_recorder, poh_service, entry_receiver) @@ -670,8 +675,13 @@ mod tests { max_tick_height: std::u64::MAX, }; - let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + None, + bank.ticks_per_slot(), + ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); poh_recorder.lock().unwrap().set_working_bank(working_bank); @@ -723,8 +733,13 @@ mod tests { min_tick_height: bank.tick_height(), max_tick_height: bank.tick_height() + 1, }; - let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some(4), + bank.ticks_per_slot(), + ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); poh_recorder.lock().unwrap().set_working_bank(working_bank); diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index c4802f16af..97b4326ca1 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -9,6 +9,7 @@ use crate::entry::create_ticks; use crate::entry::next_entry_mut; use crate::entry::Entry; use crate::gossip_service::GossipService; +use crate::leader_schedule_utils; use crate::poh_recorder::PohRecorder; use crate::poh_service::{PohService, PohServiceConfig}; use crate::rpc::JsonRpcConfig; @@ -106,8 +107,13 @@ impl Fullnode { bank.tick_height(), bank.last_blockhash(), ); - let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank), + bank.ticks_per_slot(), + ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); poh_recorder.lock().unwrap().clear_bank_signal = diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index c604f5c33a..5e9f0f146e 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -19,6 +19,8 @@ use solana_sdk::transaction::Transaction; use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::Arc; +const MAX_LAST_LEADER_GRACE_TICKS_FACTOR: u64 = 2; + #[derive(Debug, PartialEq, Eq, Clone)] pub enum PohRecorderError { InvalidCallingObject, @@ -39,9 +41,12 @@ pub struct PohRecorder { pub poh: Poh, pub clear_bank_signal: Option>, start_slot: u64, + start_tick: u64, tick_cache: Vec<(Entry, u64)>, working_bank: Option, sender: Sender, + start_leader_at_tick: Option, + max_last_leader_grace_ticks: u64, } impl PohRecorder { @@ -70,8 +75,34 @@ impl PohRecorder { self.poh.tick_height } + pub fn reached_leader_tick(&self) -> bool { + self.start_leader_at_tick + .map(|target_tick| { + // Either grace period has expired, + // or target tick is = grace period (i.e. poh recorder was just reset) + info!( + "Current tick {}, start tick {} target {}, grace {}", + self.tick_height(), + self.start_tick, + target_tick, + self.max_last_leader_grace_ticks + ); + self.tick_height() >= target_tick + || self.max_last_leader_grace_ticks + >= target_tick.saturating_sub(self.start_tick) + }) + .unwrap_or(false) + } + // synchronize PoH with a bank - pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) { + pub fn reset( + &mut self, + tick_height: u64, + blockhash: Hash, + start_slot: u64, + my_next_leader_slot: Option, + ticks_per_slot: u64, + ) { self.clear_bank(); let mut cache = vec![]; info!( @@ -80,7 +111,12 @@ impl PohRecorder { ); std::mem::swap(&mut cache, &mut self.tick_cache); self.start_slot = start_slot; + self.start_tick = tick_height + 1; self.poh = Poh::new(blockhash, tick_height); + self.max_last_leader_grace_ticks = ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR; + self.start_leader_at_tick = my_next_leader_slot + .map(|slot| Some(slot * ticks_per_slot + self.max_last_leader_grace_ticks)) + .unwrap_or(None); } pub fn set_working_bank(&mut self, working_bank: WorkingBank) { @@ -162,6 +198,10 @@ impl PohRecorder { } pub fn tick(&mut self) { + if self.start_leader_at_tick.is_none() { + return; + } + let tick = self.generate_tick(); trace!("tick {}", tick.1); self.tick_cache.push(tick); @@ -180,6 +220,8 @@ impl PohRecorder { tick_height: u64, last_entry_hash: Hash, start_slot: u64, + my_leader_slot_index: Option, + ticks_per_slot: u64, ) -> (Self, Receiver) { let poh = Poh::new(last_entry_hash, tick_height); let (sender, receiver) = channel(); @@ -191,6 +233,16 @@ impl PohRecorder { sender, clear_bank_signal: None, start_slot, + start_tick: tick_height + 1, + start_leader_at_tick: my_leader_slot_index + .map(|slot| { + Some( + slot * ticks_per_slot + + ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR, + ) + }) + .unwrap_or(None), + max_last_leader_grace_ticks: ticks_per_slot / MAX_LAST_LEADER_GRACE_TICKS_FACTOR, }, receiver, ) @@ -236,13 +288,15 @@ mod tests { use crate::test_tx::test_tx; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; + use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use std::sync::mpsc::sync_channel; use std::sync::Arc; #[test] fn test_poh_recorder_no_zero_tick() { let prev_hash = Hash::default(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), DEFAULT_TICKS_PER_SLOT); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); assert_eq!(poh_recorder.tick_cache[0].1, 1); @@ -252,7 +306,8 @@ mod tests { #[test] fn test_poh_recorder_tick_height_is_last_tick() { let prev_hash = Hash::default(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), DEFAULT_TICKS_PER_SLOT); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); @@ -262,10 +317,11 @@ mod tests { #[test] fn test_poh_recorder_reset_clears_cache() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); - poh_recorder.reset(0, Hash::default(), 0); + poh_recorder.reset(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); assert_eq!(poh_recorder.tick_cache.len(), 0); } @@ -274,7 +330,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); let working_bank = WorkingBank { bank, @@ -292,7 +349,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); let working_bank = WorkingBank { bank: bank.clone(), @@ -322,7 +380,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); poh_recorder.tick(); poh_recorder.tick(); @@ -350,7 +409,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); let working_bank = WorkingBank { bank, @@ -370,7 +430,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); let working_bank = WorkingBank { bank, @@ -399,7 +460,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); let working_bank = WorkingBank { bank, @@ -425,7 +487,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); let working_bank = WorkingBank { bank, @@ -444,17 +507,25 @@ mod tests { #[test] fn test_reset_current() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); - poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0); + poh_recorder.reset( + poh_recorder.poh.tick_height, + poh_recorder.poh.hash, + 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, + ); assert_eq!(poh_recorder.tick_cache.len(), 0); } #[test] fn test_reset_with_cached() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); @@ -462,19 +533,22 @@ mod tests { poh_recorder.tick_cache[0].1, poh_recorder.tick_cache[0].0.hash, 0, + Some(4), + DEFAULT_TICKS_PER_SLOT, ); assert_eq!(poh_recorder.tick_cache.len(), 0); } #[test] fn test_reset_to_new_value() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, Hash::default(), 0, Some(4), DEFAULT_TICKS_PER_SLOT); poh_recorder.tick(); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 3); assert_eq!(poh_recorder.poh.tick_height, 3); - poh_recorder.reset(1, hash(b"hello"), 0); + poh_recorder.reset(1, hash(b"hello"), 0, Some(4), DEFAULT_TICKS_PER_SLOT); assert_eq!(poh_recorder.tick_cache.len(), 0); poh_recorder.tick(); assert_eq!(poh_recorder.poh.tick_height, 2); @@ -484,14 +558,16 @@ mod tests { fn test_reset_clear_bank() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, Hash::default(), 0, Some(4), bank.ticks_per_slot()); + let ticks_per_slot = bank.ticks_per_slot(); let working_bank = WorkingBank { bank, min_tick_height: 2, max_tick_height: 3, }; poh_recorder.set_working_bank(working_bank); - poh_recorder.reset(1, hash(b"hello"), 0); + poh_recorder.reset(1, hash(b"hello"), 0, Some(4), ticks_per_slot); assert!(poh_recorder.working_bank.is_none()); } @@ -499,7 +575,8 @@ mod tests { pub fn test_clear_signal() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, Hash::default(), 0, None, bank.ticks_per_slot()); let (sender, receiver) = sync_channel(1); poh_recorder.set_bank(&bank); poh_recorder.clear_bank_signal = Some(sender); @@ -515,7 +592,8 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, prev_hash, 0, Some(4), bank.ticks_per_slot()); let end_slot = 3; let max_tick_height = (end_slot + 1) * ticks_per_slot - 1; @@ -537,4 +615,104 @@ mod tests { // Make sure the starting slot is updated assert_eq!(poh_recorder.start_slot(), end_slot); } + + #[test] + fn test_reached_leader_tick() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = + PohRecorder::new(0, prev_hash, 0, None, bank.ticks_per_slot()); + + // Test that with no leader slot, we don't reach the leader tick + assert_eq!(poh_recorder.reached_leader_tick(), false); + + for _ in 0..bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // Tick should not be recorded + assert_eq!(poh_recorder.tick_height(), 0); + + // Test that with no leader slot, we don't reach the leader tick after sending some ticks + assert_eq!(poh_recorder.reached_leader_tick(), false); + + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 0, + None, + bank.ticks_per_slot(), + ); + + // Test that with no leader slot in reset(), we don't reach the leader tick + assert_eq!(poh_recorder.reached_leader_tick(), false); + + // Provide a leader slot 1 slot down + poh_recorder.reset( + bank.ticks_per_slot(), + bank.last_blockhash(), + 0, + Some(2), + bank.ticks_per_slot(), + ); + + let init_ticks = poh_recorder.tick_height(); + + // Send one slot worth of ticks + for _ in 0..bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // Tick should be recorded + assert_eq!( + poh_recorder.tick_height(), + init_ticks + bank.ticks_per_slot() + ); + + // Test that we don't reach the leader tick because of grace ticks + assert_eq!(poh_recorder.reached_leader_tick(), false); + + // reset poh now. it should discard the grace ticks wait + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 1, + Some(2), + bank.ticks_per_slot(), + ); + // without sending more ticks, we should be leader now + assert_eq!(poh_recorder.reached_leader_tick(), true); + + // Now test that with grace ticks we can reach leader ticks + // Set the leader slot 1 slot down + poh_recorder.reset( + poh_recorder.tick_height(), + bank.last_blockhash(), + 2, + Some(3), + bank.ticks_per_slot(), + ); + + // Send one slot worth of ticks + for _ in 0..bank.ticks_per_slot() { + poh_recorder.tick(); + } + + // We are not the leader yet, as expected + assert_eq!(poh_recorder.reached_leader_tick(), false); + + // Send 1 less tick than the grace ticks + for _ in 0..bank.ticks_per_slot() / MAX_LAST_LEADER_GRACE_TICKS_FACTOR - 1 { + poh_recorder.tick(); + } + // We are still not the leader + assert_eq!(poh_recorder.reached_leader_tick(), false); + + // Send one more tick + poh_recorder.tick(); + + // We should be the leader now + assert_eq!(poh_recorder.reached_leader_tick(), true); + } } diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 2f97209993..5d1e378464 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -110,8 +110,13 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), prev_hash, bank.slot()); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + prev_hash, + bank.slot(), + Some(4), + bank.ticks_per_slot(), + ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let exit = Arc::new(AtomicBool::new(false)); let working_bank = WorkingBank { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index cf9fbf040c..df246971ae 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -142,14 +142,37 @@ impl ReplayStage { ); cluster_info.write().unwrap().push_vote(vote); } + let next_leader_slot = + leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank); poh_recorder.lock().unwrap().reset( bank.tick_height(), bank.last_blockhash(), bank.slot(), + next_leader_slot, + ticks_per_slot, + ); + info!( + "{:?} voted and reset poh at {}. next leader slot {:?}", + my_id, + bank.tick_height(), + next_leader_slot ); is_tpu_bank_active = false; } + let mut reached_leader_tick = false; + if !is_tpu_bank_active { + let poh = poh_recorder.lock().unwrap(); + reached_leader_tick = poh.reached_leader_tick(); + + info!( + "{:?} TPU bank inactive. poh tick {}, leader {}", + my_id, + poh.tick_height(), + reached_leader_tick + ); + }; + if !is_tpu_bank_active { assert!(ticks_per_slot > 0); let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); @@ -164,6 +187,7 @@ impl ReplayStage { &cluster_info, &blocktree, poh_slot, + reached_leader_tick, ); } @@ -195,6 +219,7 @@ impl ReplayStage { cluster_info: &Arc>, blocktree: &Blocktree, poh_slot: u64, + reached_leader_tick: bool, ) { trace!("{} checking poh slot {}", my_id, poh_slot); if blocktree.meta(poh_slot).unwrap().is_some() { @@ -214,8 +239,9 @@ impl ReplayStage { my_id, next_leader, poh_slot ); cluster_info.write().unwrap().set_leader(&next_leader); - if next_leader == *my_id { + if next_leader == *my_id && reached_leader_tick { debug!("{} starting tpu for slot {}", my_id, poh_slot); + inc_new_counter_info!("replay_stage-new_leader", poh_slot as usize); let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); bank_forks.write().unwrap().insert(poh_slot, tpu_bank); if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {