From 37003da8548216b5820e8144518b93184c8e1730 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 31 Jan 2019 19:21:02 -0800 Subject: [PATCH] Fix potential of checking tvu bank for truth when its behind (#2614) * Fix race between tpu and tvu, where tvu bank is not caught up to tpu bank * Add test * Cleanup Fullnode tests --- src/bank.rs | 20 +++- src/banking_stage.rs | 29 +++-- src/fullnode.rs | 248 +++++++++++++++++++++++++++++-------------- src/poh_recorder.rs | 4 + src/poh_service.rs | 19 +++- src/tpu.rs | 6 +- tests/multinode.rs | 4 +- 7 files changed, 230 insertions(+), 100 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index a0505936e5..0989ea86df 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -10,7 +10,8 @@ use crate::entry::EntrySlice; use crate::genesis_block::GenesisBlock; use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; use crate::leader_scheduler::LeaderScheduler; -use crate::poh_recorder::PohRecorder; +use crate::poh_recorder::{PohRecorder, PohRecorderError}; +use crate::result::Error; use crate::runtime::{self, RuntimeError}; use crate::status_cache::StatusCache; use bincode::deserialize; @@ -74,6 +75,9 @@ pub enum BankError { /// Transaction has a fee but has no signature present MissingSignatureForFee, + + // Poh recorder hit the maximum tick height before leader rotation + MaxHeightReached, } pub type Result = result::Result; @@ -449,7 +453,12 @@ impl Bank { // record and unlock will unlock all the successfull transactions poh.record(hash, processed_transactions).map_err(|e| { warn!("record failure: {:?}", e); - BankError::RecordFailure + match e { + Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { + BankError::MaxHeightReached + } + _ => BankError::RecordFailure, + } })?; } Ok(()) @@ -883,6 +892,11 @@ impl Bank { pub fn tick_height(&self) -> u64 { self.last_id_queue.read().unwrap().tick_height } + + #[cfg(test)] + pub fn last_ids(&self) -> &RwLock>> { + &self.last_ids + } } #[cfg(test)] @@ -1759,7 +1773,7 @@ mod tests { assert_eq!( bank.process_and_record_transactions(&transactions, &poh_recorder), - Err(BankError::RecordFailure) + Err(BankError::MaxHeightReached) ); assert_eq!(bank.get_balance(&pubkey), 1); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 00c58111a3..26e6a8043b 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -31,8 +31,9 @@ use sys_info; #[derive(Debug, PartialEq, Eq, Clone)] pub enum BankingStageReturnType { - LeaderRotation, + LeaderRotation(u64), ChannelDisconnected, + RecordFailure, } // number of threads is 1 until mt bank is ready @@ -44,6 +45,7 @@ pub struct BankingStage { bank_thread_hdls: Vec>>, poh_service: PohService, compute_confirmation_service: ComputeLeaderConfirmationService, + max_tick_height: Option, } impl BankingStage { @@ -110,17 +112,25 @@ impl BankingStage { Error::SendError => { break Some(BankingStageReturnType::ChannelDisconnected); } - Error::PohRecorderError(PohRecorderError::MaxHeightReached) - | Error::BankError(BankError::RecordFailure) => { + Error::BankError(BankError::RecordFailure) => { + break Some(BankingStageReturnType::RecordFailure); + } + Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { + assert!(max_tick_height.is_some()); + let max_tick_height = max_tick_height.unwrap(); if !thread_did_notify_rotation.load(Ordering::Relaxed) { - let _ = - thread_sender.send(TpuReturnType::LeaderRotation); + // Leader rotation should only happen if a max_tick_height was specified + let _ = thread_sender.send( + TpuReturnType::LeaderRotation(max_tick_height), + ); thread_did_notify_rotation .store(true, Ordering::Relaxed); } //should get restarted from the channel receiver - break Some(BankingStageReturnType::LeaderRotation); + break Some(BankingStageReturnType::LeaderRotation( + max_tick_height, + )); } _ => error!("solana-banking-stage-tx {:?}", e), } @@ -140,6 +150,7 @@ impl BankingStage { bank_thread_hdls, poh_service, compute_confirmation_service, + max_tick_height, }, entry_receiver, ) @@ -268,7 +279,9 @@ impl Service for BankingStage { match poh_return_value { Ok(_) => (), Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { - return_value = Some(BankingStageReturnType::LeaderRotation); + return_value = Some(BankingStageReturnType::LeaderRotation( + self.max_tick_height.unwrap(), + )); } Err(Error::SendError) => { return_value = Some(BankingStageReturnType::ChannelDisconnected); @@ -504,7 +517,7 @@ mod tests { ); assert_eq!( banking_stage.join().unwrap(), - Some(BankingStageReturnType::LeaderRotation) + Some(BankingStageReturnType::LeaderRotation(max_tick_height)) ); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 77efbd822d..185467a265 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -26,7 +26,9 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, RwLock}; +use std::thread::sleep; use std::thread::Result; +use std::time::Duration; use std::time::Instant; pub type TvuRotationSender = Sender; @@ -61,7 +63,7 @@ impl NodeServices { } } -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum FullnodeReturnType { LeaderToValidatorRotation, ValidatorToLeaderRotation, @@ -281,10 +283,21 @@ impl Fullnode { } } - pub fn leader_to_validator(&mut self) -> Result<()> { + pub fn leader_to_validator(&mut self, tick_height: u64) -> Result<()> { trace!("leader_to_validator"); - let (scheduled_leader, _) = self.bank.get_current_leader().unwrap(); + while self.bank.tick_height() < tick_height { + sleep(Duration::from_millis(10)); + } + + let (scheduled_leader, _) = self + .bank + .leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(tick_height + 1) + .unwrap(); + self.cluster_info .write() .unwrap() @@ -295,7 +308,7 @@ impl Fullnode { // check for that if scheduled_leader == self.id { let (last_entry_id, entry_height) = self.node_services.tvu.get_state(); - self.validator_to_leader(self.bank.tick_height(), entry_height, last_entry_id); + self.validator_to_leader(tick_height, entry_height, last_entry_id); Ok(()) } else { self.node_services.tpu.switch_to_forwarder( @@ -354,8 +367,8 @@ impl Fullnode { return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)); } _ => match should_be_forwarder { - Ok(TpuReturnType::LeaderRotation) => { - self.leader_to_validator()?; + Ok(TpuReturnType::LeaderRotation(tick_height)) => { + self.leader_to_validator(tick_height)?; return Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)); } _ => { @@ -456,11 +469,13 @@ mod tests { use crate::leader_scheduler::{ make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, }; + use crate::poh_service::NUM_TICKS_PER_SECOND; use crate::service::Service; use crate::streamer::responder; use crate::tpu::TpuReturnType; use crate::tvu::TvuReturnType; use crate::vote_signer_proxy::VoteSignerProxy; + use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::cmp; use std::fs::remove_dir_all; @@ -600,45 +615,22 @@ mod tests { fn test_wrong_role_transition() { solana_logger::setup(); - // Create the leader node information + // Create the leader and validator nodes let bootstrap_leader_keypair = Arc::new(Keypair::new()); - let bootstrap_leader_node = - Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); - let bootstrap_leader_info = bootstrap_leader_node.info.clone(); - - // Create the validator node information - let validator_keypair = Keypair::new(); - let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); - - // Make a common mint and a genesis entry for both leader + validator's ledgers - let (mint_keypair, bootstrap_leader_ledger_path, genesis_entry_height, last_id) = - create_tmp_sample_ledger( - "test_wrong_role_transition", - 10_000, + let validator_keypair = Arc::new(Keypair::new()); + let (bootstrap_leader_node, validator_node, bootstrap_leader_ledger_path, _, _) = + setup_leader_validator( + &bootstrap_leader_keypair, + &validator_keypair, 0, - bootstrap_leader_keypair.pubkey(), - 500, + 10, + "test_wrong_role_transition", ); - - // Write the entries to the ledger that will cause leader rotation - // after the bootstrap height - let validator_keypair = Arc::new(validator_keypair); - let (active_set_entries, _) = - make_active_set_entries(&validator_keypair, &mint_keypair, &last_id, &last_id, 10); - - { - let db_ledger = DbLedger::open(&bootstrap_leader_ledger_path).unwrap(); - db_ledger - .write_entries( - DEFAULT_SLOT_HEIGHT, - genesis_entry_height, - &active_set_entries, - ) - .unwrap(); - } + let bootstrap_leader_info = bootstrap_leader_node.info.clone(); let validator_ledger_path = tmp_copy_ledger(&bootstrap_leader_ledger_path, "test_wrong_role_transition"); + let ledger_paths = vec![ bootstrap_leader_ledger_path.clone(), validator_ledger_path.clone(), @@ -697,51 +689,22 @@ mod tests { #[test] fn test_validator_to_leader_transition() { - // Make a leader identity - let leader_keypair = Keypair::new(); - let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let leader_id = leader_node.info.id; - - // Create validator identity - let (mint_keypair, validator_ledger_path, genesis_entry_height, last_id) = - create_tmp_sample_ledger( + // Make leader and validator node + let leader_keypair = Arc::new(Keypair::new()); + let validator_keypair = Arc::new(Keypair::new()); + let num_genesis_ticks = 1; + let (leader_node, validator_node, validator_ledger_path, ledger_initial_len, last_id) = + setup_leader_validator( + &leader_keypair, + &validator_keypair, + num_genesis_ticks, + 0, "test_validator_to_leader_transition", - 10_000, - 1, - leader_id, - 500, ); - let validator_keypair = Keypair::new(); - let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + let leader_id = leader_keypair.pubkey(); let validator_info = validator_node.info.clone(); - let validator_keypair = Arc::new(validator_keypair); - // Write two entries so that the validator is in the active set: - // - // 1) Give the validator a nonzero number of tokens - // Write the bootstrap entries to the ledger that will cause leader rotation - // after the bootstrap height - // - // 2) A vote from the validator - let (active_set_entries, _) = - make_active_set_entries(&validator_keypair, &mint_keypair, &last_id, &last_id, 0); - let active_set_entries_len = active_set_entries.len() as u64; - let last_id = active_set_entries.last().unwrap().id; - - { - let db_ledger = DbLedger::open(&validator_ledger_path).unwrap(); - db_ledger - .write_entries( - DEFAULT_SLOT_HEIGHT, - genesis_entry_height, - &active_set_entries, - ) - .unwrap(); - } - - let ledger_initial_len = genesis_entry_height + active_set_entries_len; - // Set the leader scheduler for the validator let leader_rotation_interval = 16; let num_bootstrap_slots = 2; @@ -808,7 +771,7 @@ mod tests { break; } _ => match should_be_forwarder { - Ok(TpuReturnType::LeaderRotation) => { + Ok(TpuReturnType::LeaderRotation(_)) => { panic!("shouldn't be rotating to forwarder") } _ => continue, @@ -826,7 +789,7 @@ mod tests { assert!(bank.tick_height() >= bootstrap_height); // Only the first genesis entry has num_hashes = 0, every other entry // had num_hashes = 1 - assert!(entry_height >= bootstrap_height + active_set_entries_len); + assert!(entry_height >= bootstrap_height + ledger_initial_len - num_genesis_ticks); // Shut down t_responder.join().expect("responder thread join"); @@ -834,4 +797,127 @@ mod tests { .expect("Expected successful database destruction"); let _ignored = remove_dir_all(&validator_ledger_path).unwrap(); } + + #[test] + fn test_tvu_behind() { + // Make leader node + let leader_keypair = Arc::new(Keypair::new()); + let validator_keypair = Arc::new(Keypair::new()); + + let (leader_node, _, leader_ledger_path, _, _) = + setup_leader_validator(&leader_keypair, &validator_keypair, 1, 0, "test_tvu_behind"); + + let leader_node_info = leader_node.info.clone(); + + // Set the leader scheduler for the validator + let leader_rotation_interval = NUM_TICKS_PER_SECOND as u64 * 5; + let bootstrap_height = leader_rotation_interval; + + let leader_scheduler_config = LeaderSchedulerConfig::new( + bootstrap_height, + leader_rotation_interval, + leader_rotation_interval * 2, + bootstrap_height, + ); + + let vote_signer = VoteSignerProxy::new_local(&leader_keypair); + // Start the bootstrap leader + let mut leader = Fullnode::new( + leader_node, + &leader_keypair, + &leader_ledger_path, + Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))), + vote_signer, + Some(&leader_node_info), + Default::default(), + ); + + // Hold Tvu bank lock to prevent tvu from making progress + { + let w_last_ids = leader.bank.last_ids().write().unwrap(); + + // Wait for leader -> validator transition + let signal = leader + .role_notifiers + .1 + .recv() + .expect("signal for leader -> validator transition"); + let (rn_sender, rn_receiver) = channel(); + rn_sender.send(signal).expect("send"); + leader.role_notifiers = (leader.role_notifiers.0, rn_receiver); + + // Make sure the tvu bank is behind + assert!(w_last_ids.tick_height < bootstrap_height); + } + + // Release tvu bank lock, tvu should start making progress again and + // handle_role_transition should sucessfully rotate the leader to a validator + assert_eq!( + leader.handle_role_transition().unwrap().unwrap(), + FullnodeReturnType::LeaderToValidatorRotation + ); + assert_eq!( + leader.cluster_info.read().unwrap().leader_id(), + validator_keypair.pubkey(), + ); + assert!(!leader.node_services.tpu.is_leader()); + // Confirm the bank actually made progress + assert_eq!(leader.bank.tick_height(), bootstrap_height); + + // Shut down + leader.close().expect("leader shutdown"); + DbLedger::destroy(&leader_ledger_path).expect("Expected successful database destruction"); + let _ignored = remove_dir_all(&leader_ledger_path).unwrap(); + } + + fn setup_leader_validator( + leader_keypair: &Arc, + validator_keypair: &Arc, + num_genesis_ticks: u64, + num_ending_ticks: u64, + test_name: &str, + ) -> (Node, Node, String, u64, Hash) { + // Make a leader identity + let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader_id = leader_node.info.id; + + // Create validator identity + let (mint_keypair, ledger_path, genesis_entry_height, last_id) = + create_tmp_sample_ledger(test_name, 10_000, num_genesis_ticks, leader_id, 500); + + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + + // Write two entries so that the validator is in the active set: + // + // 1) Give the validator a nonzero number of tokens + // Write the bootstrap entries to the ledger that will cause leader rotation + // after the bootstrap height + // + // 2) A vote from the validator + let (active_set_entries, _) = make_active_set_entries( + validator_keypair, + &mint_keypair, + &last_id, + &last_id, + num_ending_ticks, + ); + + let db_ledger = DbLedger::open(&ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entry_height, + &active_set_entries, + ) + .unwrap(); + + let entry_height = genesis_entry_height + active_set_entries.len() as u64; + ( + leader_node, + validator_node, + ledger_path, + entry_height, + active_set_entries.last().unwrap().id, + ) + } } diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 662b34cc8e..39acce7dc2 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -25,6 +25,10 @@ pub struct PohRecorder { } impl PohRecorder { + pub fn max_tick_height(&self) -> Option { + self.max_tick_height + } + pub fn hash(&self) -> Result<()> { // TODO: amortize the cost of this lock by doing the loop in here for // some min amount of hashes diff --git a/src/poh_service.rs b/src/poh_service.rs index 3068146f8f..22bfff6df7 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -2,7 +2,8 @@ //! "ticks", a measure of time in the PoH stream use crate::fullnode::TpuRotationSender; -use crate::poh_recorder::PohRecorder; +use crate::poh_recorder::{PohRecorder, PohRecorderError}; +use crate::result::Error; use crate::result::Result; use crate::service::Service; use crate::tpu::TpuReturnType; @@ -82,13 +83,20 @@ impl PohService { poh_exit: &AtomicBool, to_validator_sender: &TpuRotationSender, ) -> Result<()> { + let max_tick_height = poh.max_tick_height(); loop { match config { Config::Tick(num) => { for _ in 1..num { let res = poh.hash(); if let Err(e) = res { - to_validator_sender.send(TpuReturnType::LeaderRotation)?; + if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { + // Leader rotation should only happen if a max_tick_height was specified + assert!(max_tick_height.is_some()); + to_validator_sender.send(TpuReturnType::LeaderRotation( + max_tick_height.unwrap(), + ))?; + } return Err(e); } } @@ -99,7 +107,12 @@ impl PohService { } let res = poh.tick(); if let Err(e) = res { - to_validator_sender.send(TpuReturnType::LeaderRotation)?; + if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { + // Leader rotation should only happen if a max_tick_height was specified + assert!(max_tick_height.is_some()); + to_validator_sender + .send(TpuReturnType::LeaderRotation(max_tick_height.unwrap()))?; + } return Err(e); } if poh_exit.load(Ordering::Relaxed) { diff --git a/src/tpu.rs b/src/tpu.rs index dc2be7f07b..e2e839680f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -22,7 +22,7 @@ use std::sync::{Arc, RwLock}; use std::thread; pub enum TpuReturnType { - LeaderRotation, + LeaderRotation(u64), } pub enum TpuMode { @@ -270,8 +270,8 @@ impl Service for Tpu { svcs.sigverify_stage.join()?; svcs.cluster_info_vote_listener.join()?; match svcs.banking_stage.join()? { - Some(BankingStageReturnType::LeaderRotation) => { - Ok(Some(TpuReturnType::LeaderRotation)) + Some(BankingStageReturnType::LeaderRotation(tick_height)) => { + Ok(Some(TpuReturnType::LeaderRotation(tick_height))) } _ => Ok(None), } diff --git a/tests/multinode.rs b/tests/multinode.rs index 727eabe086..b08b9a5acd 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1266,9 +1266,9 @@ fn run_node(id: Pubkey, mut fullnode: Fullnode, should_exit: Arc) -> fullnode.validator_to_leader(tick_height, entry_height, last_entry_id); } Err(_) => match should_be_fwdr { - Ok(TpuReturnType::LeaderRotation) => { + Ok(TpuReturnType::LeaderRotation(tick_height)) => { fullnode - .leader_to_validator() + .leader_to_validator(tick_height) .expect("failed when transitioning to validator"); } Err(_) => {