From af03df38b905aa7765b013daaf0e2290c99cbe38 Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 13 Mar 2019 14:06:12 -0700 Subject: [PATCH] Don't vote for empty leader transmissions (#3248) * Don't vote for empty leader transmissions * Add is_delta flag to bank to detect empty leader transmissions * Plumb new is_votable flag through replay stage * Fix PohRecorder tests * Change is_delta to AtomicBool to avoid making Bank references mutable * Reset start slot in poh_recorder when working bank is cleared, so that connsecutive TPU's will start from the correct place * Use proper max tick height calculation * Test for not voting on empty transmission * tests for is_votable --- core/src/poh_recorder.rs | 70 +++++++++++++++++++------------------- core/src/replay_stage.rs | 72 ++++++++++++++++++++++++++++++++++------ runtime/src/bank.rs | 48 +++++++++++++++++++++++++-- 3 files changed, 141 insertions(+), 49 deletions(-) diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 0df9058e17..c604f5c33a 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -73,19 +73,6 @@ impl PohRecorder { // synchronize PoH with a bank pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) { self.clear_bank(); - let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| { - if entry.hash == blockhash { - assert_eq!(*entry_tick_height, tick_height); - } - entry.hash == blockhash - }); - if existing { - info!( - "reset skipped for: {},{}", - self.poh.hash, self.poh.tick_height - ); - return; - } let mut cache = vec![]; info!( "reset poh from: {},{} to: {},{}", @@ -159,6 +146,7 @@ impl PohRecorder { "poh_record: max_tick_height reached, setting working bank {} to None", working_bank.bank.slot() ); + self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot(); self.clear_bank(); } if e.is_err() { @@ -461,7 +449,7 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0); - assert_eq!(poh_recorder.tick_cache.len(), 2); + assert_eq!(poh_recorder.tick_cache.len(), 0); } #[test] @@ -475,28 +463,7 @@ mod tests { poh_recorder.tick_cache[0].0.hash, 0, ); - assert_eq!(poh_recorder.tick_cache.len(), 2); - poh_recorder.reset( - poh_recorder.tick_cache[1].1, - poh_recorder.tick_cache[1].0.hash, - 0, - ); - assert_eq!(poh_recorder.tick_cache.len(), 2); - } - - #[test] - #[should_panic] - fn test_reset_with_cached_bad_height() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); - poh_recorder.tick(); - poh_recorder.tick(); - assert_eq!(poh_recorder.tick_cache.len(), 2); - //mixed up heights - poh_recorder.reset( - poh_recorder.tick_cache[0].1, - poh_recorder.tick_cache[1].0.hash, - 0, - ); + assert_eq!(poh_recorder.tick_cache.len(), 0); } #[test] @@ -539,4 +506,35 @@ mod tests { poh_recorder.clear_bank(); assert!(receiver.try_recv().is_ok()); } + + #[test] + fn test_poh_recorder_reset_start_slot() { + let ticks_per_slot = 5; + let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2); + genesis_block.ticks_per_slot = ticks_per_slot; + let bank = Arc::new(Bank::new(&genesis_block)); + + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); + + let end_slot = 3; + let max_tick_height = (end_slot + 1) * ticks_per_slot - 1; + let working_bank = WorkingBank { + bank, + min_tick_height: 1, + max_tick_height, + }; + + poh_recorder.set_working_bank(working_bank); + for _ in 0..max_tick_height { + poh_recorder.tick(); + } + + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err()); + assert!(poh_recorder.working_bank.is_none()); + // Make sure the starting slot is updated + assert_eq!(poh_recorder.start_slot(), end_slot); + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 026fb28222..80d34e92a9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -20,7 +20,7 @@ use solana_sdk::timing::duration_as_ms; use solana_vote_api::vote_transaction::VoteTransaction; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -108,15 +108,13 @@ impl ReplayStage { } let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1; if bank.tick_height() == max_tick_height { - bank.freeze(); - info!("bank frozen {}", bank.slot()); - progress.remove(bank_slot); - if let Err(e) = - slot_full_sender.send((bank.slot(), bank.collector_id())) - { - info!("{} slot_full alert failed: {:?}", my_id, e); - } - votable.push(bank); + Self::process_completed_bank( + &my_id, + bank, + &mut progress, + &mut votable, + &slot_full_sender, + ); } } @@ -315,6 +313,24 @@ impl ReplayStage { Ok(()) } + fn process_completed_bank( + my_id: &Pubkey, + bank: Arc, + progress: &mut HashMap, + votable: &mut Vec>, + slot_full_sender: &Sender<(u64, Pubkey)>, + ) { + bank.freeze(); + info!("bank frozen {}", bank.slot()); + progress.remove(&bank.slot()); + if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) { + info!("{} slot_full alert failed: {:?}", my_id, e); + } + if bank.is_votable() { + votable.push(bank); + } + } + fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) { // Find the next slot that chains to the old slot let frozen_banks = forks.frozen_banks(); @@ -439,6 +455,42 @@ mod test { let _ignored = remove_dir_all(&my_ledger_path); } + #[test] + fn test_no_vote_empty_transmission() { + let genesis_block = GenesisBlock::new(10_000).0; + let bank = Arc::new(Bank::new(&genesis_block)); + let mut blockhash = bank.last_blockhash(); + let mut entries = Vec::new(); + for _ in 0..genesis_block.ticks_per_slot { + let entry = next_entry_mut(&mut blockhash, 1, vec![]); //just ticks + entries.push(entry); + } + let (sender, _receiver) = channel(); + + let mut progress = HashMap::new(); + let (forward_entry_sender, _forward_entry_receiver) = channel(); + ReplayStage::replay_entries_into_bank( + &bank, + entries.clone(), + &mut progress, + &forward_entry_sender, + 0, + ) + .unwrap(); + + let mut votable = vec![]; + ReplayStage::process_completed_bank( + &Pubkey::default(), + bank, + &mut progress, + &mut votable, + &sender, + ); + assert!(progress.is_empty()); + // Don't vote on slot that only contained ticks + assert!(votable.is_empty()); + } + #[test] fn test_replay_stage_poh_ok_entry_receiver() { let (forward_entry_sender, forward_entry_receiver) = channel(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 596ef2e2d3..cdc26faa45 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -24,7 +24,7 @@ use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction::Vote; use solana_vote_api::vote_state::{Lockout, VoteState}; use std::result; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -192,6 +192,10 @@ pub struct Bank { /// staked nodes on epoch boundaries, saved off when a bank.slot() is at /// a leader schedule boundary epoch_vote_accounts: HashMap>, + + /// A boolean reflecting whether any entries were recorded into the PoH + /// stream for the slot == self.slot + is_delta: AtomicBool, } impl Default for HashQueue { @@ -223,7 +227,6 @@ impl Bank { /// Create a new bank that points to an immutable checkpoint of another bank. pub fn new_from_parent(parent: &Arc, collector_id: &Pubkey, slot: u64) -> Self { parent.freeze(); - assert_ne!(slot, parent.slot()); let mut bank = Self::default(); @@ -684,6 +687,9 @@ impl Bank { if self.is_frozen() { warn!("=========== FIXME: commit_transactions() working on a frozen bank! ================"); } + + self.is_delta.store(true, Ordering::Relaxed); + // TODO: put this assert back in // assert!(!self.is_frozen()); let now = Instant::now(); @@ -838,7 +844,6 @@ impl Bank { // tick_height is using an AtomicUSize because AtomicU64 is not yet a stable API. // Until we can switch to AtomicU64, fail if usize is not the same as u64 assert_eq!(std::usize::MAX, 0xFFFF_FFFF_FFFF_FFFF); - self.tick_height.load(Ordering::SeqCst) as u64 } @@ -871,6 +876,11 @@ impl Bank { pub fn get_epoch_and_slot_index(&self, slot: u64) -> (u64, u64) { self.epoch_schedule.get_epoch_and_slot_index(slot) } + + pub fn is_votable(&self) -> bool { + let max_tick_height = (self.slot + 1) * self.ticks_per_slot - 1; + self.is_delta.load(Ordering::Relaxed) && self.tick_height() == max_tick_height + } } #[cfg(test)] @@ -878,6 +888,7 @@ mod tests { use super::*; use bincode::serialize; use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS}; + use solana_sdk::hash; use solana_sdk::native_program::ProgramError; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_instruction::SystemInstruction; @@ -1599,4 +1610,35 @@ mod tests { } } + #[test] + fn test_is_delta_true() { + let (genesis_block, mint_keypair) = GenesisBlock::new(500); + let bank = Arc::new(Bank::new(&genesis_block)); + let key1 = Keypair::new(); + let tx_move_mint_to_1 = + SystemTransaction::new_move(&mint_keypair, &key1.pubkey(), 1, genesis_block.hash(), 0); + assert_eq!(bank.process_transaction(&tx_move_mint_to_1), Ok(())); + assert_eq!(bank.is_delta.load(Ordering::Relaxed), true); + } + + #[test] + fn test_is_votable() { + let (genesis_block, mint_keypair) = GenesisBlock::new(500); + let bank = Arc::new(Bank::new(&genesis_block)); + let key1 = Keypair::new(); + assert_eq!(bank.is_votable(), false); + + // Set is_delta to true + let tx_move_mint_to_1 = + SystemTransaction::new_move(&mint_keypair, &key1.pubkey(), 1, genesis_block.hash(), 0); + assert_eq!(bank.process_transaction(&tx_move_mint_to_1), Ok(())); + assert_eq!(bank.is_votable(), false); + + // Register enough ticks to hit max tick height + for i in 0..genesis_block.ticks_per_slot - 1 { + bank.register_tick(&hash::hash(format!("hello world {}", i).as_bytes())); + } + + assert_eq!(bank.is_votable(), true); + } }