From 0c592c52f664e7b8901db02380210ba429c7aa16 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Mon, 11 Mar 2019 13:58:23 -0700 Subject: [PATCH] Wake up replay stage when the poh bank is cleared. (#3211) * wake up replay stage when the poh bank is cleared * bump ticks per second * Increase ticks per slot to match faster tick rate * Remove check that working bank must be the bank for the greatest slot * Make start_leader() skip starting TPU for slots we've already been leader for --- core/src/bank_forks.rs | 4 +--- core/src/blocktree.rs | 2 +- core/src/fullnode.rs | 7 +++++++ core/src/poh_recorder.rs | 24 +++++++++++++++++++++--- core/src/replay_stage.rs | 5 ++++- sdk/src/timing.rs | 4 ++-- 6 files changed, 36 insertions(+), 10 deletions(-) diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index b251e3a580..1e3f1e2763 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -68,9 +68,7 @@ impl BankForks { let prev = self.banks.insert(bank_slot, bank.clone()); assert!(prev.is_none()); - if bank_slot > self.working_bank.slot() { - self.working_bank = bank.clone() - } + self.working_bank = bank.clone(); // TODO: this really only needs to look at the first // parent if we're always calling insert() diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 25f4f28ef7..d8425e87d7 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -324,7 +324,7 @@ pub struct Blocktree { meta_cf: MetaCf, data_cf: DataCf, erasure_cf: ErasureCf, - new_blobs_signals: Vec>, + pub new_blobs_signals: Vec>, ticks_per_slot: u64, } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 96ff4e6905..475de8a379 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -110,6 +110,13 @@ impl Fullnode { PohRecorder::new(bank.tick_height(), bank.last_blockhash()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); + poh_recorder.lock().unwrap().clear_bank_signal = + blocktree.new_blobs_signals.first().cloned(); + assert_eq!( + blocktree.new_blobs_signals.len(), + 1, + "New blob signal for the TVU should be the same as the clear bank signal." + ); info!("node info: {:?}", node.info); info!("node entrypoint_info: {:?}", entrypoint_info_option); diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 85ca3002f2..7ae89ea442 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -16,7 +16,7 @@ use crate::result::{Error, Result}; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::transaction::Transaction; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Clone)] @@ -40,11 +40,15 @@ pub struct PohRecorder { tick_cache: Vec<(Entry, u64)>, working_bank: Option, sender: Sender, + pub clear_bank_signal: Option>, } impl PohRecorder { pub fn clear_bank(&mut self) { self.working_bank = None; + if let Some(ref signal) = self.clear_bank_signal { + let _ = signal.try_send(true); + } } pub fn hash(&mut self) { @@ -147,12 +151,12 @@ impl PohRecorder { "poh_record: max_tick_height reached, setting working bank {} to None", working_bank.bank.slot() ); - self.working_bank = None; + self.clear_bank(); } if e.is_err() { info!("WorkingBank::sender disconnected {:?}", e); //revert the cache, but clear the working bank - self.working_bank = None; + self.clear_bank(); } else { //commit the flush let _ = self.tick_cache.drain(..cnt); @@ -185,6 +189,7 @@ impl PohRecorder { tick_cache: vec![], working_bank: None, sender, + clear_bank_signal: None, }, receiver, ) @@ -230,6 +235,7 @@ mod tests { use crate::test_tx::test_tx; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; + use std::sync::mpsc::sync_channel; use std::sync::Arc; #[test] @@ -505,4 +511,16 @@ mod tests { poh_recorder.reset(1, hash(b"hello")); assert!(poh_recorder.working_bank.is_none()); } + + #[test] + pub fn test_clear_signal() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (sender, receiver) = sync_channel(1); + poh_recorder.set_bank(&bank); + poh_recorder.clear_bank_signal = Some(sender); + poh_recorder.clear_bank(); + assert!(receiver.try_recv().is_ok()); + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index ad8bad10c3..5cce8dca53 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -188,8 +188,11 @@ impl ReplayStage { if let Some((_, parent)) = newest_frozen.last() { let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); let poh_slot = leader_schedule_utils::tick_height_to_slot(parent, poh_tick_height + 1); - assert!(frozen.get(&poh_slot).is_none()); trace!("checking poh slot for leader {}", poh_slot); + if frozen.get(&poh_slot).is_some() { + // Already been a leader for this slot, skip it + return; + } if bank_forks.read().unwrap().get(poh_slot).is_none() { leader_schedule_utils::slot_leader_at(poh_slot, parent) .map(|next_leader| { diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index f25cde2944..50d6ac0175 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -2,11 +2,11 @@ use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; -pub const NUM_TICKS_PER_SECOND: u64 = 10; +pub const NUM_TICKS_PER_SECOND: u64 = 100; // At 10 ticks/s, 8 ticks per slot implies that leader rotation and voting will happen // every 800 ms. A fast voting cadence ensures faster finality and convergence -pub const DEFAULT_TICKS_PER_SLOT: u64 = 16; +pub const DEFAULT_TICKS_PER_SLOT: u64 = 160; pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 16; /// The time window of recent block hash values that the bank will track the signatures