From cb8661bd49bb3591360589cba73d3593249b9fcc Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 19 Sep 2020 14:03:54 +0900 Subject: [PATCH] Persistent tower (#10718) * Save/restore Tower * Avoid unwrap() * Rebase cleanups * Forcibly pass test * Correct reconcilation of votes after validator resume * d b g * Add more tests * fsync and fix test * Add test * Fix fmt * Debug * Fix tests... * save * Clarify error message and code cleaning around it * Move most of code out of tower save hot codepath * Proper comment for the lack of fsync on tower * Clean up * Clean up * Simpler type alias * Manage tower-restored ancestor slots without banks * Add comment * Extract long code blocks... * Add comment * Simplify returned tuple... * Tweak too aggresive log * Fix typo... * Add test * Update comment * Improve test to require non-empty stray restored slots * Measure tower save and dump all tower contents * Log adjust and add threshold related assertions * cleanup adjust * Properly lower stray restored slots priority... * Rust fmt * Fix test.... * Clarify comments a bit and add TowerError::TooNew * Further clean-up arround TowerError * Truly create ancestors by excluding last vote slot * Add comment for stray_restored_slots * Add comment for stray_restored_slots * Use BTreeSet * Consider root_slot into post-replay adjustment * Tweak logging * Add test for stray_restored_ancestors * Reorder some code * Better names for unit tests * Add frozen_abi to SavedTower * Fold long lines * Tweak stray ancestors and too old slot history * Re-adjust error conditon of too old slot history * Test normal ancestors is checked before stray ones * Fix conflict, update tests, adjust behavior a bit * Fix test * Address review comments * Last touch! * Immediately after creating cleaning pr * Revert stray slots * Revert comment... * Report error as metrics * Revert not to panic! and ignore unfixable test... * Normalize lockouts.root_slot more strictly * Add comments for panic! and more assertions * Proper initialize root without vote account * Clarify code and comments based on review feedback * Fix rebase * Further simplify based on assured tower root * Reorder code for more readability Co-authored-by: Michael Vines --- core/benches/consensus.rs | 36 + core/src/consensus.rs | 1149 +++++++++++++++++++++- core/src/heaviest_subtree_fork_choice.rs | 108 +- core/src/replay_stage.rs | 127 ++- core/src/tvu.rs | 5 + core/src/validator.rs | 101 +- ledger/src/ancestor_iterator.rs | 36 + local-cluster/tests/local_cluster.rs | 233 ++++- multinode-demo/bootstrap-validator.sh | 1 + multinode-demo/validator.sh | 1 + programs/vote/src/vote_state/mod.rs | 3 + run.sh | 1 + runtime/src/bank.rs | 4 + sdk/src/slot_history.rs | 6 +- validator/src/main.rs | 7 + 15 files changed, 1712 insertions(+), 106 deletions(-) create mode 100644 core/benches/consensus.rs diff --git a/core/benches/consensus.rs b/core/benches/consensus.rs new file mode 100644 index 000000000..64035f4c3 --- /dev/null +++ b/core/benches/consensus.rs @@ -0,0 +1,36 @@ +#![feature(test)] + +extern crate solana_core; +extern crate test; + +use solana_core::consensus::Tower; +use solana_runtime::bank::Bank; +use solana_runtime::bank_forks::BankForks; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::sync::Arc; +use tempfile::TempDir; +use test::Bencher; + +#[bench] +fn bench_save_tower(bench: &mut Bencher) { + let dir = TempDir::new().unwrap(); + let path = dir.path(); + + let vote_account_pubkey = &Pubkey::default(); + let node_keypair = Arc::new(Keypair::new()); + let heaviest_bank = BankForks::new(Bank::default()).working_bank(); + let tower = Tower::new( + &node_keypair.pubkey(), + &vote_account_pubkey, + 0, + &heaviest_bank, + &path, + ); + + bench.iter(move || { + tower.save(&node_keypair).unwrap(); + }); +} diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 249853cc0..214e5f8d3 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -3,6 +3,8 @@ use crate::{ pubkey_references::PubkeyReferences, }; use chrono::prelude::*; +use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db}; +use solana_measure::measure::Measure; use solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE}; use solana_sdk::{ account::Account, @@ -10,16 +12,23 @@ use solana_sdk::{ hash::Hash, instruction::Instruction, pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, + slot_history::{Check, SlotHistory}, }; use solana_vote_program::{ vote_instruction, vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY}, }; use std::{ + cmp::Ordering, collections::{HashMap, HashSet}, + fs::{self, File}, + io::BufReader, ops::Bound::{Included, Unbounded}, - sync::{Arc, RwLock}, + path::{Path, PathBuf}, + sync::Arc, }; +use thiserror::Error; #[derive(PartialEq, Clone, Debug)] pub enum SwitchForkDecision { @@ -57,6 +66,8 @@ impl SwitchForkDecision { pub const VOTE_THRESHOLD_DEPTH: usize = 8; pub const SWITCH_FORK_THRESHOLD: f64 = 0.38; +pub type Result = std::result::Result; + pub type Stake = u64; pub type VotedStakes = HashMap; pub type PubkeyVotes = Vec<(Pubkey, Slot)>; @@ -72,7 +83,7 @@ pub(crate) struct ComputedBankState { } #[frozen_abi(digest = "2ZUeCLMVQxmHYbeqMH7M97ifVSKoVErGvRHzyxcQRjgU")] -#[derive(Serialize, AbiExample)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] pub struct Tower { node_pubkey: Pubkey, threshold_depth: usize, @@ -80,18 +91,34 @@ pub struct Tower { lockouts: VoteState, last_vote: Vote, last_timestamp: BlockTimestamp, + #[serde(skip)] + path: PathBuf, + #[serde(skip)] + tmp_path: PathBuf, // used before atomic fs::rename() + #[serde(skip)] + // Restored last voted slot which cannot be found in SlotHistory at replayed root + // (This is a special field for slashing-free validator restart with edge cases). + // This could be emptied after some time; but left intact indefinitely for easier + // implementation + stray_restored_slot: Option, } impl Default for Tower { fn default() -> Self { - Self { + let mut tower = Self { node_pubkey: Pubkey::default(), threshold_depth: VOTE_THRESHOLD_DEPTH, threshold_size: VOTE_THRESHOLD_SIZE, lockouts: VoteState::default(), last_vote: Vote::default(), last_timestamp: BlockTimestamp::default(), - } + path: PathBuf::default(), + tmp_path: PathBuf::default(), + stray_restored_slot: Option::default(), + }; + // VoteState::root_slot is ensured to be Some in Tower + tower.lockouts.root_slot = Some(Slot::default()); + tower } } @@ -100,14 +127,23 @@ impl Tower { node_pubkey: &Pubkey, vote_account_pubkey: &Pubkey, root: Slot, - heaviest_bank: &Bank, + bank: &Bank, + path: &Path, ) -> Self { - let mut tower = Self::new_with_key(node_pubkey); - tower.initialize_lockouts_from_bank_forks(vote_account_pubkey, root, heaviest_bank); + let path = Self::get_filename(&path, node_pubkey); + let tmp_path = Self::get_tmp_filename(&path); + let mut tower = Self { + node_pubkey: *node_pubkey, + path, + tmp_path, + ..Tower::default() + }; + tower.initialize_lockouts_from_bank(vote_account_pubkey, root, bank); tower } + #[cfg(test)] pub fn new_with_key(node_pubkey: &Pubkey) -> Self { Self { node_pubkey: *node_pubkey, @@ -124,6 +160,40 @@ impl Tower { } } + pub fn new_from_bankforks( + bank_forks: &BankForks, + ledger_path: &Path, + my_pubkey: &Pubkey, + vote_account: &Pubkey, + ) -> Self { + let root_bank = bank_forks.root_bank(); + let (_progress, heaviest_subtree_fork_choice, unlock_heaviest_subtree_fork_choice_slot) = + crate::replay_stage::ReplayStage::initialize_progress_and_fork_choice( + root_bank, + bank_forks.frozen_banks().values().cloned().collect(), + &my_pubkey, + &vote_account, + ); + let root = root_bank.slot(); + + let heaviest_bank = if root > unlock_heaviest_subtree_fork_choice_slot { + bank_forks + .get(heaviest_subtree_fork_choice.best_overall_slot()) + .expect("The best overall slot must be one of `frozen_banks` which all exist in bank_forks") + .clone() + } else { + Tower::find_heaviest_bank(&bank_forks, &my_pubkey).unwrap_or_else(|| root_bank.clone()) + }; + + Self::new( + &my_pubkey, + &vote_account, + root, + &heaviest_bank, + &ledger_path, + ) + } + pub(crate) fn collect_vote_lockouts( node_pubkey: &Pubkey, bank_slot: Slot, @@ -324,17 +394,17 @@ impl Tower { } #[cfg(test)] - fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option { + pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option { let vote = Vote::new(vec![slot], hash); self.record_bank_vote(vote) } - pub fn last_vote(&self) -> &Vote { - &self.last_vote + pub fn last_voted_slot(&self) -> Option { + self.last_vote.last_voted_slot() } - pub fn last_voted_slot(&self) -> Option { - self.last_vote().last_voted_slot() + pub fn stray_restored_slot(&self) -> Option { + self.stray_restored_slot } pub fn last_vote_and_timestamp(&mut self) -> Vote { @@ -359,6 +429,13 @@ impl Tower { None } + // root may be forcibly set by arbitrary replay root slot, for example from a root + // after replaying a snapshot. + // Also, tower.root() couldn't be None; do_initialize_lockouts() ensures that. + // Conceptually, every tower must have been constructed from a concrete starting point, + // which establishes the origin of trust (i.e. root) whether booting from genesis (slot 0) or + // snapshot (slot N). In other words, there should be no possibility a Tower doesn't have + // root, unlike young vote accounts. pub fn root(&self) -> Option { self.lockouts.root_slot } @@ -403,7 +480,13 @@ impl Tower { // This case should never happen because bank forks purges all // non-descendants of the root every time root is set if slot != root_slot { - assert!(ancestors[&slot].contains(&root_slot)); + assert!( + ancestors[&slot].contains(&root_slot), + "ancestors: {:?}, slot: {} root: {}", + ancestors[&slot], + slot, + root_slot + ); } } @@ -419,10 +502,31 @@ impl Tower { total_stake: u64, epoch_vote_accounts: &HashMap, ) -> SwitchForkDecision { - let root = self.lockouts.root_slot.unwrap_or(0); self.last_voted_slot() .map(|last_voted_slot| { - let last_vote_ancestors = ancestors.get(&last_voted_slot).unwrap(); + let root = self.lockouts.root_slot.unwrap_or(0); + let empty_ancestors = HashSet::default(); + + let last_vote_ancestors = + ancestors.get(&last_voted_slot).unwrap_or_else(|| { + if !self.is_stray_last_vote() { + // Unless last vote is stray, ancestors.get(last_voted_slot) must + // return Some(_), justifying to panic! here. + // Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None, + // if all saved votes are ancestors of replayed_root_slot. So this code shouldn't be + // touched in that case as well. + // In other words, except being stray, all other slots have been voted on while + // this validator has been running, so we must be able to fetch ancestors for + // all of them. + panic!("no ancestors found with slot: {}", last_voted_slot); + } else { + // bank_forks doesn't have corresponding data for the stray restored last vote, + // meaning some inconsistency between saved tower and ledger. + // (newer snapshot, or only a saved tower is moved over to new setup?) + &empty_ancestors + } + }); + let switch_slot_ancestors = ancestors.get(&switch_slot).unwrap(); if switch_slot == last_voted_slot || switch_slot_ancestors.contains(&last_voted_slot) { @@ -579,13 +683,11 @@ impl Tower { } pub(crate) fn find_heaviest_bank( - bank_forks: &RwLock, + bank_forks: &BankForks, node_pubkey: &Pubkey, ) -> Option> { - let ancestors = bank_forks.read().unwrap().ancestors(); + let ancestors = bank_forks.ancestors(); let mut bank_weights: Vec<_> = bank_forks - .read() - .unwrap() .frozen_banks() .values() .map(|b| { @@ -637,36 +739,367 @@ impl Tower { bank_weight } - fn initialize_lockouts_from_bank_forks( + fn voted_slots(&self) -> Vec { + self.lockouts + .votes + .iter() + .map(|lockout| lockout.slot) + .collect() + } + + pub fn is_stray_last_vote(&self) -> bool { + if let Some(last_voted_slot) = self.last_voted_slot() { + if let Some(stray_restored_slot) = self.stray_restored_slot { + return stray_restored_slot == last_voted_slot; + } + } + + false + } + + // The tower root can be older/newer if the validator booted from a newer/older snapshot, so + // tower lockouts may need adjustment + pub fn adjust_lockouts_after_replay( + self, + replayed_root: Slot, + slot_history: &SlotHistory, + ) -> Result { + info!( + "adjusting lockouts (after replay up to {}): {:?}", + replayed_root, + self.voted_slots() + ); + + // sanity assertions for roots + assert_eq!(slot_history.check(replayed_root), Check::Found); + assert!(self.root().is_some()); + let tower_root = self.root().unwrap(); + // reconcile_blockstore_roots_with_tower() should already have aligned these. + assert!( + tower_root <= replayed_root, + format!( + "tower root: {:?} >= replayed root slot: {}", + tower_root, replayed_root + ) + ); + assert!( + self.last_vote == Vote::default() && self.lockouts.votes.is_empty() + || self.last_vote != Vote::default() && !self.lockouts.votes.is_empty(), + format!( + "last vote: {:?} lockouts.votes: {:?}", + self.last_vote, self.lockouts.votes + ) + ); + + // return immediately if votes are empty... + if self.lockouts.votes.is_empty() { + return Ok(self); + } + + let last_voted_slot = self.last_voted_slot().unwrap(); + if slot_history.check(last_voted_slot) == Check::TooOld { + // We could try hard to anchor with other older votes, but opt to simplify the + // following logic + return Err(TowerError::TooOldTower( + last_voted_slot, + slot_history.oldest(), + )); + } + + self.do_adjust_lockouts_after_replay(tower_root, replayed_root, slot_history) + } + + fn do_adjust_lockouts_after_replay( + mut self, + tower_root: Slot, + replayed_root: Slot, + slot_history: &SlotHistory, + ) -> Result { + // retained slots will be consisted only from divergent slots + let mut retain_flags_for_each_vote_in_reverse: Vec<_> = + Vec::with_capacity(self.lockouts.votes.len()); + + let mut still_in_future = true; + let mut past_outside_history = false; + let mut checked_slot = None; + let mut anchored_slot = None; + + let mut slots_in_tower = vec![tower_root]; + slots_in_tower.extend(self.voted_slots()); + + // iterate over votes + root (if any) in the newest => oldest order + // bail out early if bad condition is found + for slot_in_tower in slots_in_tower.iter().rev() { + let check = slot_history.check(*slot_in_tower); + + if anchored_slot.is_none() && check == Check::Found { + anchored_slot = Some(*slot_in_tower); + } else if anchored_slot.is_some() && check == Check::NotFound { + // this can't happen unless we're fed with bogus snapshot + return Err(TowerError::FatallyInconsistent("diverged ancestor?")); + } + + if still_in_future && check != Check::Future { + still_in_future = false; + } else if !still_in_future && check == Check::Future { + // really odd cases: bad ordered votes? + return Err(TowerError::FatallyInconsistent("time warped?")); + } + if !past_outside_history && check == Check::TooOld { + past_outside_history = true; + } else if past_outside_history && check != Check::TooOld { + // really odd cases: bad ordered votes? + return Err(TowerError::FatallyInconsistent( + "not too old once after got too old?", + )); + } + + if let Some(checked_slot) = checked_slot { + // This is really special, only if tower is initialized (root = slot 0) for genesis and contains + // a vote (= slot 0) for the genesis, the slot 0 can repeat only once + let voting_from_genesis = *slot_in_tower == checked_slot && *slot_in_tower == 0; + + if !voting_from_genesis { + // Unless we're voting since genesis, slots_in_tower must always be older than last checked_slot + // including all vote slot and the root slot. + assert!(*slot_in_tower < checked_slot) + } + } + + checked_slot = Some(*slot_in_tower); + + retain_flags_for_each_vote_in_reverse.push(anchored_slot.is_none()); + } + + // Check for errors if not anchored + info!("adjusted tower's anchored slot: {:?}", anchored_slot); + if anchored_slot.is_none() { + // this error really shouldn't happen unless ledger/tower is corrupted + return Err(TowerError::FatallyInconsistent( + "no common slot for rooted tower", + )); + } + + assert_eq!( + slots_in_tower.len(), + retain_flags_for_each_vote_in_reverse.len() + ); + // pop for the tower root + retain_flags_for_each_vote_in_reverse.pop(); + let mut retain_flags_for_each_vote = + retain_flags_for_each_vote_in_reverse.into_iter().rev(); + + let original_votes_len = self.lockouts.votes.len(); + self.do_initialize_lockouts(replayed_root, move |_| { + retain_flags_for_each_vote.next().unwrap() + }); + + if self.lockouts.votes.is_empty() { + info!( + "All restored votes were behind replayed_root({}); resetting root_slot and last_vote in tower!", + replayed_root + ); + // we might not have banks for those votes so just reset. + // That's because the votes may well past replayed_root + self.last_vote = Vote::default(); + } else { + info!( + "{} restored votes (out of {}) were on different fork or are upcoming votes on unrooted slots: {:?}!", + self.voted_slots().len(), + original_votes_len, + self.voted_slots() + ); + + assert_eq!( + self.last_vote.last_voted_slot().unwrap(), + *self.voted_slots().last().unwrap() + ); + self.stray_restored_slot = Some(self.last_vote.last_voted_slot().unwrap()); + } + + Ok(self) + } + + fn initialize_lockouts_from_bank( &mut self, vote_account_pubkey: &Pubkey, root: Slot, - heaviest_bank: &Bank, + bank: &Bank, ) { - if let Some((_stake, vote_account)) = heaviest_bank.vote_accounts().get(vote_account_pubkey) - { - let mut vote_state = VoteState::deserialize(&vote_account.data) + if let Some((_stake, vote_account)) = bank.vote_accounts().get(vote_account_pubkey) { + let vote_state = VoteState::deserialize(&vote_account.data) .expect("vote_account isn't a VoteState?"); - vote_state.root_slot = Some(root); - vote_state.votes.retain(|v| v.slot > root); + self.lockouts = vote_state; + self.do_initialize_lockouts(root, |v| v.slot > root); trace!( "{} lockouts initialized to {:?}", self.node_pubkey, - vote_state + self.lockouts ); assert_eq!( - vote_state.node_pubkey, self.node_pubkey, + self.lockouts.node_pubkey, self.node_pubkey, "vote account's node_pubkey doesn't match", ); - self.lockouts = vote_state; } else { + self.do_initialize_lockouts(root, |_| true); info!( - "vote account({}) not found in heaviest bank (slot={})", + "vote account({}) not found in bank (slot={})", vote_account_pubkey, - heaviest_bank.slot() + bank.slot() ); } } + + fn do_initialize_lockouts bool>(&mut self, root: Slot, should_retain: F) { + // Updating root is needed to correctly restore from newly-saved tower for the next + // boot + self.lockouts.root_slot = Some(root); + self.lockouts.votes.retain(should_retain); + } + + pub fn get_filename(path: &Path, node_pubkey: &Pubkey) -> PathBuf { + path.join(format!("tower-{}", node_pubkey)) + .with_extension("bin") + } + + pub fn get_tmp_filename(path: &Path) -> PathBuf { + path.with_extension("bin.new") + } + + pub fn save(&self, node_keypair: &Arc) -> Result<()> { + let mut measure = Measure::start("tower_save-ms"); + + if self.node_pubkey != node_keypair.pubkey() { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_keypair.pubkey(), + self.node_pubkey + ))); + } + + let filename = &self.path; + let new_filename = &self.tmp_path; + { + // overwrite anything if exists + let mut file = File::create(&new_filename)?; + let saved_tower = SavedTower::new(self, node_keypair)?; + bincode::serialize_into(&mut file, &saved_tower)?; + // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster! + } + fs::rename(&new_filename, &filename)?; + // self.path.parent().sync_all() hurts performance same as the above sync + + measure.stop(); + inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize); + + Ok(()) + } + + pub fn restore(path: &Path, node_pubkey: &Pubkey) -> Result { + let filename = Self::get_filename(path, node_pubkey); + + // Ensure to create parent dir here, because restore() precedes save() always + fs::create_dir_all(&filename.parent().unwrap())?; + + let file = File::open(&filename)?; + let mut stream = BufReader::new(file); + + let saved_tower: SavedTower = bincode::deserialize_from(&mut stream)?; + if !saved_tower.verify(node_pubkey) { + return Err(TowerError::InvalidSignature); + } + let mut tower = saved_tower.deserialize()?; + tower.path = filename; + tower.tmp_path = Self::get_tmp_filename(&tower.path); + + // check that the tower actually belongs to this node + if &tower.node_pubkey != node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + Ok(tower) + } +} + +#[derive(Error, Debug)] +pub enum TowerError { + #[error("IO Error: {0}")] + IOError(#[from] std::io::Error), + + #[error("Serialization Error: {0}")] + SerializeError(#[from] bincode::Error), + + #[error("The signature on the saved tower is invalid")] + InvalidSignature, + + #[error("The tower does not match this validator: {0}")] + WrongTower(String), + + #[error( + "The tower is too old: \ + newest slot in tower ({0}) << oldest slot in available history ({1})" + )] + TooOldTower(Slot, Slot), + + #[error("The tower is fatally inconsistent with blockstore: {0}")] + FatallyInconsistent(&'static str), +} + +#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")] +#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] +pub struct SavedTower { + signature: Signature, + data: Vec, +} + +impl SavedTower { + pub fn new(tower: &Tower, keypair: &Arc) -> Result { + let data = bincode::serialize(tower)?; + let signature = keypair.sign_message(&data); + Ok(Self { data, signature }) + } + + pub fn verify(&self, pubkey: &Pubkey) -> bool { + self.signature.verify(pubkey.as_ref(), &self.data) + } + + pub fn deserialize(&self) -> Result { + bincode::deserialize(&self.data).map_err(|e| e.into()) + } +} + +// Given an untimely crash, tower may have roots that are not reflected in blockstore because +// `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots +pub fn reconcile_blockstore_roots_with_tower( + tower: &Tower, + blockstore: &Blockstore, +) -> blockstore_db::Result<()> { + if let Some(tower_root) = tower.root() { + let last_blockstore_root = blockstore.last_root(); + if last_blockstore_root < tower_root { + // Ensure tower_root itself to exist and be marked as rooted in the blockstore + // in addition to its ancestors. + let new_roots: Vec<_> = AncestorIterator::new_inclusive(tower_root, &blockstore) + .take_while(|current| match current.cmp(&last_blockstore_root) { + Ordering::Greater => true, + Ordering::Equal => false, + Ordering::Less => panic!( + "couldn't find a last_blockstore_root upwards from: {}!?", + tower_root + ), + }) + .collect(); + assert!( + !new_roots.is_empty(), + "at least 1 parent slot must be found" + ); + + blockstore.set_roots(&new_roots)? + } + } + Ok(()) } #[cfg(test)] @@ -681,6 +1114,7 @@ pub mod test { progress_map::ForkProgress, replay_stage::{HeaviestForkFailures, ReplayStage}, }; + use solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path}; use solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -688,12 +1122,21 @@ pub mod test { create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, }, }; - use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signer}; + use solana_sdk::{ + clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signer, slot_history::SlotHistory, + }; use solana_vote_program::{ vote_state::{Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_transaction, }; - use std::{collections::HashMap, rc::Rc, sync::RwLock}; + use std::{ + collections::HashMap, + fs::{remove_file, OpenOptions}, + io::{Read, Seek, SeekFrom, Write}, + rc::Rc, + sync::RwLock, + }; + use tempfile::TempDir; use trees::{tr, Tree, TreeWalk}; pub(crate) struct VoteSimulator { @@ -1837,4 +2280,644 @@ pub mod test { tower.last_timestamp.timestamp += 1_000_000; // Move last_timestamp well into the future assert!(tower.maybe_timestamp(3).is_none()); // slot 3 gets no timestamp } + + fn run_test_load_tower_snapshot( + modify_original: F, + modify_serialized: G, + ) -> (Tower, Result) + where + F: Fn(&mut Tower, &Pubkey), + G: Fn(&PathBuf), + { + let dir = TempDir::new().unwrap(); + let identity_keypair = Arc::new(Keypair::new()); + + // Use values that will not match the default derived from BankForks + let mut tower = Tower::new_for_tests(10, 0.9); + tower.path = Tower::get_filename(&dir.path().to_path_buf(), &identity_keypair.pubkey()); + tower.tmp_path = Tower::get_tmp_filename(&tower.path); + + modify_original(&mut tower, &identity_keypair.pubkey()); + + tower.save(&identity_keypair).unwrap(); + modify_serialized(&tower.path); + let loaded = Tower::restore(&dir.path(), &identity_keypair.pubkey()); + + (tower, loaded) + } + + #[test] + fn test_switch_threshold_across_tower_reload() { + solana_logger::setup(); + // Init state + let mut vote_simulator = VoteSimulator::new(2); + let my_pubkey = vote_simulator.node_pubkeys[0]; + let other_vote_account = vote_simulator.vote_pubkeys[1]; + let bank0 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(0) + .unwrap() + .clone(); + let total_stake = bank0.total_epoch_stake(); + assert_eq!( + total_stake, + vote_simulator.validator_keypairs.len() as u64 * 10_000 + ); + + // Create the tree of banks + let forks = tr(0) + / (tr(1) + / (tr(2) + / tr(10) + / (tr(43) + / (tr(44) + // Minor fork 2 + / (tr(45) / (tr(46) / (tr(47) / (tr(48) / (tr(49) / (tr(50))))))) + / (tr(110) / tr(111)))))); + + // Fill the BankForks according to the above fork structure + vote_simulator.fill_bank_forks(forks, &HashMap::new()); + for (_, fork_progress) in vote_simulator.progress.iter_mut() { + fork_progress.fork_stats.computed = true; + } + + let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + let descendants = vote_simulator.bank_forks.read().unwrap().descendants(); + let mut tower = Tower::new_with_key(&my_pubkey); + + tower.record_vote(43, Hash::default()); + tower.record_vote(44, Hash::default()); + tower.record_vote(45, Hash::default()); + tower.record_vote(46, Hash::default()); + tower.record_vote(47, Hash::default()); + tower.record_vote(48, Hash::default()); + tower.record_vote(49, Hash::default()); + + // Trying to switch to a descendant of last vote should always work + assert_eq!( + tower.check_switch_threshold( + 50, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ), + SwitchForkDecision::NoSwitch + ); + + // Trying to switch to another fork at 110 should fail + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ), + SwitchForkDecision::FailedSwitchThreshold + ); + + vote_simulator.simulate_lockout_interval(111, (10, 49), &other_vote_account); + + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ), + SwitchForkDecision::SwitchProof(Hash::default()) + ); + + assert_eq!(tower.voted_slots(), vec![43, 44, 45, 46, 47, 48, 49]); + { + let mut tower = tower.clone(); + tower.record_vote(110, Hash::default()); + tower.record_vote(111, Hash::default()); + assert_eq!(tower.voted_slots(), vec![43, 110, 111]); + assert_eq!(tower.lockouts.root_slot, Some(0)); + } + + // Prepare simulated validator restart! + let mut vote_simulator = VoteSimulator::new(2); + let other_vote_account = vote_simulator.vote_pubkeys[1]; + let bank0 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(0) + .unwrap() + .clone(); + let total_stake = bank0.total_epoch_stake(); + let forks = tr(0) + / (tr(1) + / (tr(2) + / tr(10) + / (tr(43) + / (tr(44) + // Minor fork 2 + / (tr(45) / (tr(46) / (tr(47) / (tr(48) / (tr(49) / (tr(50))))))) + / (tr(110) / tr(111)))))); + let replayed_root_slot = 44; + + // Fill the BankForks according to the above fork structure + vote_simulator.fill_bank_forks(forks, &HashMap::new()); + for (_, fork_progress) in vote_simulator.progress.iter_mut() { + fork_progress.fork_stats.computed = true; + } + + // prepend tower restart! + let mut slot_history = SlotHistory::default(); + vote_simulator.set_root(replayed_root_slot); + let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + let descendants = vote_simulator.bank_forks.read().unwrap().descendants(); + for slot in &[0, 1, 2, 43, replayed_root_slot] { + slot_history.add(*slot); + } + let mut tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![45, 46, 47, 48, 49]); + + // Trying to switch to another fork at 110 should fail + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ), + SwitchForkDecision::FailedSwitchThreshold + ); + + // Add lockout_interval which should be excluded + vote_simulator.simulate_lockout_interval(111, (45, 50), &other_vote_account); + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ), + SwitchForkDecision::FailedSwitchThreshold + ); + + // Add lockout_interval which should not be excluded + vote_simulator.simulate_lockout_interval(111, (110, 200), &other_vote_account); + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ), + SwitchForkDecision::SwitchProof(Hash::default()) + ); + + tower.record_vote(110, Hash::default()); + tower.record_vote(111, Hash::default()); + assert_eq!(tower.voted_slots(), vec![110, 111]); + assert_eq!(tower.lockouts.root_slot, Some(replayed_root_slot)); + } + + #[test] + fn test_load_tower_ok() { + let (tower, loaded) = + run_test_load_tower_snapshot(|tower, pubkey| tower.node_pubkey = *pubkey, |_| ()); + let loaded = loaded.unwrap(); + assert_eq!(loaded, tower); + assert_eq!(tower.threshold_depth, 10); + assert!((tower.threshold_size - 0.9_f64).abs() < f64::EPSILON); + assert_eq!(loaded.threshold_depth, 10); + assert!((loaded.threshold_size - 0.9_f64).abs() < f64::EPSILON); + } + + #[test] + fn test_load_tower_wrong_identity() { + let identity_keypair = Arc::new(Keypair::new()); + let tower = Tower::new_with_key(&Pubkey::default()); + assert_matches!( + tower.save(&identity_keypair), + Err(TowerError::WrongTower(_)) + ) + } + + #[test] + fn test_load_tower_invalid_signature() { + let (_, loaded) = run_test_load_tower_snapshot( + |tower, pubkey| tower.node_pubkey = *pubkey, + |path| { + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(path) + .unwrap(); + let mut buf = [0u8]; + assert_eq!(file.read(&mut buf).unwrap(), 1); + buf[0] = !buf[0]; + assert_eq!(file.seek(SeekFrom::Start(0)).unwrap(), 0); + assert_eq!(file.write(&buf).unwrap(), 1); + }, + ); + assert_matches!(loaded, Err(TowerError::InvalidSignature)) + } + + #[test] + fn test_load_tower_deser_failure() { + let (_, loaded) = run_test_load_tower_snapshot( + |tower, pubkey| tower.node_pubkey = *pubkey, + |path| { + OpenOptions::new() + .write(true) + .truncate(true) + .open(&path) + .unwrap_or_else(|_| panic!("Failed to truncate file: {:?}", path)); + }, + ); + assert_matches!(loaded, Err(TowerError::SerializeError(_))) + } + + #[test] + fn test_load_tower_missing() { + let (_, loaded) = run_test_load_tower_snapshot( + |tower, pubkey| tower.node_pubkey = *pubkey, + |path| { + remove_file(path).unwrap(); + }, + ); + assert_matches!(loaded, Err(TowerError::IOError(_))) + } + + #[test] + fn test_reconcile_blockstore_roots_with_tower_normal() { + solana_logger::setup(); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + let (shreds, _) = make_slot_entries(1, 0, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (shreds, _) = make_slot_entries(3, 1, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (shreds, _) = make_slot_entries(4, 1, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + assert!(!blockstore.is_root(0)); + assert!(!blockstore.is_root(1)); + assert!(!blockstore.is_root(3)); + assert!(!blockstore.is_root(4)); + + let mut tower = Tower::new_with_key(&Pubkey::default()); + tower.lockouts.root_slot = Some(4); + reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap(); + + assert!(!blockstore.is_root(0)); + assert!(blockstore.is_root(1)); + assert!(!blockstore.is_root(3)); + assert!(blockstore.is_root(4)); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + #[should_panic(expected = "couldn't find a last_blockstore_root upwards from: 4!?")] + fn test_reconcile_blockstore_roots_with_tower_panic_no_common_root() { + solana_logger::setup(); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + let (shreds, _) = make_slot_entries(1, 0, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (shreds, _) = make_slot_entries(3, 1, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (shreds, _) = make_slot_entries(4, 1, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + blockstore.set_roots(&[3]).unwrap(); + assert!(!blockstore.is_root(0)); + assert!(!blockstore.is_root(1)); + assert!(blockstore.is_root(3)); + assert!(!blockstore.is_root(4)); + + let mut tower = Tower::new_with_key(&Pubkey::default()); + tower.lockouts.root_slot = Some(4); + reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap(); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + #[should_panic(expected = "at least 1 parent slot must be found")] + fn test_reconcile_blockstore_roots_with_tower_panic_no_parent() { + solana_logger::setup(); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + let (shreds, _) = make_slot_entries(1, 0, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (shreds, _) = make_slot_entries(3, 1, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + assert!(!blockstore.is_root(0)); + assert!(!blockstore.is_root(1)); + assert!(!blockstore.is_root(3)); + + let mut tower = Tower::new_with_key(&Pubkey::default()); + tower.lockouts.root_slot = Some(4); + reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap(); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_adjust_lockouts_after_replay_future_slots() { + solana_logger::setup(); + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(0, Hash::default()); + tower.record_vote(1, Hash::default()); + tower.record_vote(2, Hash::default()); + tower.record_vote(3, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + + let replayed_root_slot = 1; + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![2, 3]); + assert_eq!(tower.root(), Some(replayed_root_slot)); + + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + assert_eq!(tower.voted_slots(), vec![2, 3]); + assert_eq!(tower.root(), Some(replayed_root_slot)); + } + + #[test] + fn test_adjust_lockouts_after_replay_not_found_slots() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(0, Hash::default()); + tower.record_vote(1, Hash::default()); + tower.record_vote(2, Hash::default()); + tower.record_vote(3, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + slot_history.add(4); + + let replayed_root_slot = 4; + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![2, 3]); + assert_eq!(tower.root(), Some(replayed_root_slot)); + } + + #[test] + fn test_adjust_lockouts_after_replay_all_rooted_with_no_too_old() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(0, Hash::default()); + tower.record_vote(1, Hash::default()); + tower.record_vote(2, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + slot_history.add(2); + slot_history.add(3); + slot_history.add(4); + slot_history.add(5); + + let replayed_root_slot = 5; + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![] as Vec); + assert_eq!(tower.root(), Some(replayed_root_slot)); + assert_eq!(tower.stray_restored_slot, None); + } + + #[test] + fn test_adjust_lockouts_after_relay_all_rooted_with_too_old() { + use solana_sdk::slot_history::MAX_ENTRIES; + + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(0, Hash::default()); + tower.record_vote(1, Hash::default()); + tower.record_vote(2, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + slot_history.add(2); + slot_history.add(MAX_ENTRIES); + + tower = tower + .adjust_lockouts_after_replay(MAX_ENTRIES, &slot_history) + .unwrap(); + assert_eq!(tower.voted_slots(), vec![] as Vec); + assert_eq!(tower.root(), Some(MAX_ENTRIES)); + } + + #[test] + fn test_adjust_lockouts_after_replay_anchored_future_slots() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(0, Hash::default()); + tower.record_vote(1, Hash::default()); + tower.record_vote(2, Hash::default()); + tower.record_vote(3, Hash::default()); + tower.record_vote(4, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + slot_history.add(2); + + let replayed_root_slot = 2; + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![3, 4]); + assert_eq!(tower.root(), Some(replayed_root_slot)); + } + + #[test] + fn test_adjust_lockouts_after_replay_all_not_found() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(5, Hash::default()); + tower.record_vote(6, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + slot_history.add(2); + slot_history.add(7); + + let replayed_root_slot = 7; + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![5, 6]); + assert_eq!(tower.root(), Some(replayed_root_slot)); + } + + #[test] + fn test_adjust_lockouts_after_replay_all_not_found_even_if_rooted() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.lockouts.root_slot = Some(4); + tower.record_vote(5, Hash::default()); + tower.record_vote(6, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + slot_history.add(2); + slot_history.add(7); + + let replayed_root_slot = 7; + let result = tower.adjust_lockouts_after_replay(replayed_root_slot, &slot_history); + + assert_eq!( + format!("{}", result.unwrap_err()), + "The tower is fatally inconsistent with blockstore: no common slot for rooted tower" + ); + } + + #[test] + fn test_adjust_lockouts_after_replay_all_future_votes_only_root_found() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.lockouts.root_slot = Some(2); + tower.record_vote(3, Hash::default()); + tower.record_vote(4, Hash::default()); + tower.record_vote(5, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(1); + slot_history.add(2); + + let replayed_root_slot = 2; + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![3, 4, 5]); + assert_eq!(tower.root(), Some(replayed_root_slot)); + } + + #[test] + fn test_adjust_lockouts_after_replay_empty() { + let mut tower = Tower::new_for_tests(10, 0.9); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + + let replayed_root_slot = 0; + tower = tower + .adjust_lockouts_after_replay(replayed_root_slot, &slot_history) + .unwrap(); + + assert_eq!(tower.voted_slots(), vec![] as Vec); + assert_eq!(tower.root(), Some(replayed_root_slot)); + } + + #[test] + fn test_adjust_lockouts_after_replay_too_old_tower() { + use solana_sdk::slot_history::MAX_ENTRIES; + + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(0, Hash::default()); + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(MAX_ENTRIES); + + let result = tower.adjust_lockouts_after_replay(MAX_ENTRIES, &slot_history); + assert_eq!( + format!("{}", result.unwrap_err()), + "The tower is too old: newest slot in tower (0) << oldest slot in available history (1)" + ); + } + + #[test] + fn test_adjust_lockouts_after_replay_time_warped() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.lockouts.votes.push_back(Lockout::new(1)); + tower.lockouts.votes.push_back(Lockout::new(0)); + let vote = Vote::new(vec![0], Hash::default()); + tower.last_vote = vote; + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + + let result = tower.adjust_lockouts_after_replay(0, &slot_history); + assert_eq!( + format!("{}", result.unwrap_err()), + "The tower is fatally inconsistent with blockstore: time warped?" + ); + } + + #[test] + fn test_adjust_lockouts_after_replay_diverged_ancestor() { + let mut tower = Tower::new_for_tests(10, 0.9); + tower.lockouts.votes.push_back(Lockout::new(1)); + tower.lockouts.votes.push_back(Lockout::new(2)); + let vote = Vote::new(vec![2], Hash::default()); + tower.last_vote = vote; + + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + slot_history.add(2); + + let result = tower.adjust_lockouts_after_replay(2, &slot_history); + assert_eq!( + format!("{}", result.unwrap_err()), + "The tower is fatally inconsistent with blockstore: diverged ancestor?" + ); + } + + #[test] + fn test_adjust_lockouts_after_replay_out_of_order() { + use solana_sdk::slot_history::MAX_ENTRIES; + + let mut tower = Tower::new_for_tests(10, 0.9); + tower + .lockouts + .votes + .push_back(Lockout::new(MAX_ENTRIES - 1)); + tower.lockouts.votes.push_back(Lockout::new(0)); + tower.lockouts.votes.push_back(Lockout::new(1)); + let vote = Vote::new(vec![1], Hash::default()); + tower.last_vote = vote; + + let mut slot_history = SlotHistory::default(); + slot_history.add(MAX_ENTRIES); + + let result = tower.adjust_lockouts_after_replay(MAX_ENTRIES, &slot_history); + assert_eq!( + format!("{}", result.unwrap_err()), + "The tower is fatally inconsistent with blockstore: not too old once after got too old?" + ); + } } diff --git a/core/src/heaviest_subtree_fork_choice.rs b/core/src/heaviest_subtree_fork_choice.rs index 999a667ac..c1dbdb1b6 100644 --- a/core/src/heaviest_subtree_fork_choice.rs +++ b/core/src/heaviest_subtree_fork_choice.rs @@ -491,6 +491,44 @@ impl HeaviestSubtreeForkChoice { ); } + fn heaviest_slot_on_same_voted_fork(&self, tower: &Tower) -> Option { + tower + .last_voted_slot() + .map(|last_voted_slot| { + let heaviest_slot_on_same_voted_fork = self.best_slot(last_voted_slot); + if heaviest_slot_on_same_voted_fork.is_none() { + if !tower.is_stray_last_vote() { + // Unless last vote is stray, self.bast_slot(last_voted_slot) must return + // Some(_), justifying to panic! here. + // Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None, + // if all saved votes are ancestors of replayed_root_slot. So this code shouldn't be + // touched in that case as well. + // In other words, except being stray, all other slots have been voted on while this + // validator has been running, so we must be able to fetch best_slots for all of + // them. + panic!( + "a bank at last_voted_slot({}) is a frozen bank so must have been\ + added to heaviest_subtree_fork_choice at time of freezing", + last_voted_slot, + ) + } else { + // fork_infos doesn't have corresponding data for the stray restored last vote, + // meaning some inconsistency between saved tower and ledger. + // (newer snapshot, or only a saved tower is moved over to new setup?) + return None; + } + } + let heaviest_slot_on_same_voted_fork = heaviest_slot_on_same_voted_fork.unwrap(); + + if heaviest_slot_on_same_voted_fork == last_voted_slot { + None + } else { + Some(heaviest_slot_on_same_voted_fork) + } + }) + .unwrap_or(None) + } + #[cfg(test)] fn set_stake_voted_at(&mut self, slot: Slot, stake_voted_at: u64) { self.fork_infos.get_mut(&slot).unwrap().stake_voted_at = stake_voted_at; @@ -550,26 +588,17 @@ impl ForkChoice for HeaviestSubtreeForkChoice { _ancestors: &HashMap>, bank_forks: &RwLock, ) -> (Arc, Option>) { - let last_voted_slot = tower.last_voted_slot(); - let heaviest_slot_on_same_voted_fork = last_voted_slot.map(|last_voted_slot| { - let heaviest_slot_on_same_voted_fork = - self.best_slot(last_voted_slot).expect("a bank at last_voted_slot is a frozen bank so must have been added to heaviest_subtree_fork_choice at time of freezing"); - if heaviest_slot_on_same_voted_fork == last_voted_slot { - None - } else { - Some(heaviest_slot_on_same_voted_fork) - } - }).unwrap_or(None); - let heaviest_slot = self.best_overall_slot(); let r_bank_forks = bank_forks.read().unwrap(); + ( - r_bank_forks.get(heaviest_slot).unwrap().clone(), - heaviest_slot_on_same_voted_fork.map(|heaviest_slot_on_same_voted_fork| { - r_bank_forks - .get(heaviest_slot_on_same_voted_fork) - .unwrap() - .clone() - }), + r_bank_forks.get(self.best_overall_slot()).unwrap().clone(), + self.heaviest_slot_on_same_voted_fork(tower) + .map(|heaviest_slot_on_same_voted_fork| { + r_bank_forks + .get(heaviest_slot_on_same_voted_fork) + .unwrap() + .clone() + }), ) } } @@ -611,6 +640,7 @@ mod test { use super::*; use crate::consensus::test::VoteSimulator; use solana_runtime::{bank::Bank, bank_utils}; + use solana_sdk::{hash::Hash, slot_history::SlotHistory}; use std::{collections::HashSet, ops::Range}; use trees::tr; @@ -1490,6 +1520,48 @@ mod test { assert!(heaviest_subtree_fork_choice.subtree_diff(0, 6).is_empty()); } + #[test] + fn test_stray_restored_slot() { + let forks = tr(0) / (tr(1) / tr(2)); + let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_tree(forks); + + let mut tower = Tower::new_for_tests(10, 0.9); + tower.record_vote(1, Hash::default()); + + assert_eq!(tower.is_stray_last_vote(), false); + assert_eq!( + heaviest_subtree_fork_choice.heaviest_slot_on_same_voted_fork(&tower), + Some(2) + ); + + // Make slot 1 (existing in bank_forks) a restored stray slot + let mut slot_history = SlotHistory::default(); + slot_history.add(0); + // Work around TooOldSlotHistory + slot_history.add(999); + tower = tower + .adjust_lockouts_after_replay(0, &slot_history) + .unwrap(); + + assert_eq!(tower.is_stray_last_vote(), true); + assert_eq!( + heaviest_subtree_fork_choice.heaviest_slot_on_same_voted_fork(&tower), + Some(2) + ); + + // Make slot 3 (NOT existing in bank_forks) a restored stray slot + tower.record_vote(3, Hash::default()); + tower = tower + .adjust_lockouts_after_replay(0, &slot_history) + .unwrap(); + + assert_eq!(tower.is_stray_last_vote(), true); + assert_eq!( + heaviest_subtree_fork_choice.heaviest_slot_on_same_voted_fork(&tower), + None + ); + } + fn setup_forks() -> HeaviestSubtreeForkChoice { /* Build fork structure: diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 347ba1d4c..1c96ed450 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -219,6 +219,7 @@ impl ReplayStage { cluster_info: Arc, ledger_signal_receiver: Receiver, poh_recorder: Arc>, + mut tower: Tower, vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, @@ -255,53 +256,16 @@ impl ReplayStage { let mut all_pubkeys = PubkeyReferences::default(); let verify_recyclers = VerifyRecyclers::default(); let _exit = Finalizer::new(exit.clone()); - let mut progress = ProgressMap::default(); - let mut frozen_banks: Vec<_> = bank_forks - .read() - .unwrap() - .frozen_banks() - .values() - .cloned() - .collect(); - - frozen_banks.sort_by_key(|bank| bank.slot()); - - // Initialize progress map with any root banks - for bank in &frozen_banks { - let prev_leader_slot = progress.get_bank_prev_leader_slot(bank); - progress.insert( - bank.slot(), - ForkProgress::new_from_bank( - bank, - &my_pubkey, - &vote_account, - prev_leader_slot, - 0, - 0, - ), - ); - } - let root_bank = bank_forks.read().unwrap().root_bank().clone(); - let root = root_bank.slot(); - let unlock_heaviest_subtree_fork_choice_slot = - Self::get_unlock_heaviest_subtree_fork_choice(root_bank.cluster_type()); - let mut heaviest_subtree_fork_choice = - HeaviestSubtreeForkChoice::new_from_frozen_banks(root, &frozen_banks); + let ( + mut progress, + mut heaviest_subtree_fork_choice, + unlock_heaviest_subtree_fork_choice_slot, + ) = Self::initialize_progress_and_fork_choice_with_locked_bank_forks( + &bank_forks, + &my_pubkey, + &vote_account, + ); let mut bank_weight_fork_choice = BankWeightForkChoice::default(); - let heaviest_bank = if root > unlock_heaviest_subtree_fork_choice_slot { - bank_forks - .read() - .unwrap() - .get(heaviest_subtree_fork_choice.best_overall_slot()) - .expect( - "The best overall slot must be one of `frozen_banks` which all - exist in bank_forks", - ) - .clone() - } else { - Tower::find_heaviest_bank(&bank_forks, &my_pubkey).unwrap_or(root_bank) - }; - let mut tower = Tower::new(&my_pubkey, &vote_account, root, &heaviest_bank); let mut current_leader = None; let mut last_reset = Hash::default(); let mut partition_exists = false; @@ -652,6 +616,65 @@ impl ReplayStage { .unwrap_or(true) } + fn initialize_progress_and_fork_choice_with_locked_bank_forks( + bank_forks: &RwLock, + my_pubkey: &Pubkey, + vote_account: &Pubkey, + ) -> (ProgressMap, HeaviestSubtreeForkChoice, Slot) { + let (root_bank, frozen_banks) = { + let bank_forks = bank_forks.read().unwrap(); + ( + bank_forks.root_bank().clone(), + bank_forks.frozen_banks().values().cloned().collect(), + ) + }; + + Self::initialize_progress_and_fork_choice( + &root_bank, + frozen_banks, + &my_pubkey, + &vote_account, + ) + } + + pub(crate) fn initialize_progress_and_fork_choice( + root_bank: &Arc, + mut frozen_banks: Vec>, + my_pubkey: &Pubkey, + vote_account: &Pubkey, + ) -> (ProgressMap, HeaviestSubtreeForkChoice, Slot) { + let mut progress = ProgressMap::default(); + + frozen_banks.sort_by_key(|bank| bank.slot()); + + // Initialize progress map with any root banks + for bank in &frozen_banks { + let prev_leader_slot = progress.get_bank_prev_leader_slot(bank); + progress.insert( + bank.slot(), + ForkProgress::new_from_bank( + bank, + &my_pubkey, + &vote_account, + prev_leader_slot, + 0, + 0, + ), + ); + } + let root = root_bank.slot(); + let unlock_heaviest_subtree_fork_choice_slot = + Self::get_unlock_heaviest_subtree_fork_choice(root_bank.cluster_type()); + let heaviest_subtree_fork_choice = + HeaviestSubtreeForkChoice::new_from_frozen_banks(root, &frozen_banks); + + ( + progress, + heaviest_subtree_fork_choice, + unlock_heaviest_subtree_fork_choice_slot, + ) + } + fn report_memory( allocated: &solana_measure::thread_mem_usage::Allocatedp, name: &'static str, @@ -1015,7 +1038,15 @@ impl ReplayStage { } trace!("handle votable bank {}", bank.slot()); let (vote, tower_index) = tower.new_vote_from_bank(bank, vote_account_pubkey); - if let Some(new_root) = tower.record_bank_vote(vote) { + let new_root = tower.record_bank_vote(vote); + let last_vote = tower.last_vote_and_timestamp(); + + if let Err(err) = tower.save(&cluster_info.keypair) { + error!("Unable to save tower: {:?}", err); + std::process::exit(1); + } + + if let Some(new_root) = new_root { // get the root bank before squash let root_bank = bank_forks .read() @@ -1075,7 +1106,7 @@ impl ReplayStage { bank, vote_account_pubkey, authorized_voter_keypairs, - tower.last_vote_and_timestamp(), + last_vote, tower_index, switch_fork_decision, ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 6b06a8a9b..489e0768b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -10,6 +10,7 @@ use crate::{ cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker}, cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, + consensus::Tower, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, @@ -90,6 +91,7 @@ impl Tvu { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, + tower: Tower, leader_schedule_cache: &Arc, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, @@ -203,6 +205,7 @@ impl Tvu { cluster_info.clone(), ledger_signal_receiver, poh_recorder.clone(), + tower, vote_tracker, cluster_slots, retransmit_slots_sender, @@ -301,6 +304,7 @@ pub mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); + let tower = Tower::new_with_key(&target1_keypair.pubkey()); let tvu = Tvu::new( &vote_keypair.pubkey(), vec![Arc::new(vote_keypair)], @@ -322,6 +326,7 @@ pub mod tests { block_commitment_cache.clone(), )), &poh_recorder, + tower, &leader_schedule_cache, &exit, completed_slots_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index 002b1b748..cfb02027f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -6,6 +6,7 @@ use crate::{ cluster_info::{ClusterInfo, Node}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, + consensus::{reconcile_blockstore_roots_with_tower, Tower, TowerError}, contact_info::ContactInfo, gossip_service::{discover_cluster, GossipService}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, @@ -95,6 +96,7 @@ pub struct ValidatorConfig { pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, pub wal_recovery_mode: Option, + pub require_tower: bool, } impl Default for ValidatorConfig { @@ -125,6 +127,7 @@ impl Default for ValidatorConfig { accounts_hash_interval_slots: std::u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, wal_recovery_mode: None, + require_tower: false, } } } @@ -253,7 +256,8 @@ impl Validator { cache_block_time_sender, cache_block_time_service, }, - ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit); + tower, + ) = new_banks_from_ledger(&id, vote_account, config, ledger_path, poh_verify, &exit); let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank = bank_forks.working_bank(); @@ -475,6 +479,7 @@ impl Validator { ledger_signal_receiver, &subscriptions, &poh_recorder, + tower, &leader_schedule_cache, &exit, completed_slots_receiver, @@ -613,8 +618,81 @@ impl Validator { } } +fn active_vote_account_exists_in_bank(bank: &Arc, vote_account: &Pubkey) -> bool { + if let Some(account) = &bank.get_account(vote_account) { + if let Some(vote_state) = VoteState::from(&account) { + return !vote_state.votes.is_empty(); + } + } + false +} + +fn post_process_restored_tower( + restored_tower: crate::consensus::Result, + validator_identity: &Pubkey, + vote_account: &Pubkey, + config: &ValidatorConfig, + ledger_path: &Path, + bank_forks: &BankForks, +) -> Tower { + restored_tower + .and_then(|tower| { + let root_bank = bank_forks.root_bank(); + let slot_history = root_bank.get_slot_history(); + tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history) + }) + .unwrap_or_else(|err| { + let voting_has_been_active = + active_vote_account_exists_in_bank(&bank_forks.working_bank(), &vote_account); + let saved_tower_is_missing = if let TowerError::IOError(io_err) = &err { + io_err.kind() == std::io::ErrorKind::NotFound + } else { + false + }; + if !saved_tower_is_missing { + datapoint_error!( + "tower_error", + ( + "error", + format!("Unable to restore tower: {}", err), + String + ), + ); + } + if config.require_tower && voting_has_been_active { + error!("Requested mandatory tower restore failed: {}", err); + error!( + "And there is an existing vote_account containing actual votes. \ + Aborting due to possible conflicting duplicate votes" + ); + process::exit(1); + } + if saved_tower_is_missing && !voting_has_been_active { + // Currently, don't protect against spoofed snapshots with no tower at all + info!( + "Ignoring expected failed tower restore because this is the initial \ + validator start with the vote account..." + ); + } else { + error!( + "Rebuilding a new tower from the latest vote account due to failed tower restore: {}", + err + ); + } + + Tower::new_from_bankforks( + &bank_forks, + &ledger_path, + &validator_identity, + &vote_account, + ) + }) +} + #[allow(clippy::type_complexity)] fn new_banks_from_ledger( + validator_identity: &Pubkey, + vote_account: &Pubkey, config: &ValidatorConfig, ledger_path: &Path, poh_verify: bool, @@ -628,6 +706,7 @@ fn new_banks_from_ledger( LeaderScheduleCache, Option<(Slot, Hash)>, TransactionHistoryServices, + Tower, ) { info!("loading ledger from {:?}...", ledger_path); let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size); @@ -659,6 +738,14 @@ fn new_banks_from_ledger( .expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); + let restored_tower = Tower::restore(ledger_path, &validator_identity); + if let Ok(tower) = &restored_tower { + reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap_or_else(|err| { + error!("Failed to reconcile blockstore with tower: {:?}", err); + std::process::exit(1); + }); + } + let process_options = blockstore_processor::ProcessOptions { poh_verify, dev_halt_at_slot: config.dev_halt_at_slot, @@ -690,6 +777,17 @@ fn new_banks_from_ledger( process::exit(1); }); + let tower = post_process_restored_tower( + restored_tower, + &validator_identity, + &vote_account, + &config, + &ledger_path, + &bank_forks, + ); + + info!("Tower state: {:?}", tower); + leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone()); bank_forks.set_snapshot_config(config.snapshot_config.clone()); @@ -704,6 +802,7 @@ fn new_banks_from_ledger( leader_schedule_cache, snapshot_hash, transaction_history_services, + tower, ) } diff --git a/ledger/src/ancestor_iterator.rs b/ledger/src/ancestor_iterator.rs index 0c01757d6..6c8099ce9 100644 --- a/ledger/src/ancestor_iterator.rs +++ b/ledger/src/ancestor_iterator.rs @@ -20,6 +20,13 @@ impl<'a> AncestorIterator<'a> { blockstore, } } + + pub fn new_inclusive(start_slot: Slot, blockstore: &'a Blockstore) -> Self { + Self { + current: blockstore.meta(start_slot).unwrap().map(|_| start_slot), + blockstore, + } + } } impl<'a> Iterator for AncestorIterator<'a> { type Item = Slot; @@ -111,4 +118,33 @@ mod tests { vec![2, 1, 0] ); } + + #[test] + fn test_ancestor_iterator_inclusive() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + + let (shreds, _) = make_slot_entries(0, 0, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (shreds, _) = make_slot_entries(1, 0, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (shreds, _) = make_slot_entries(2, 1, 42); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + assert_eq!( + AncestorIterator::new(2, &blockstore).collect::>(), + vec![1, 0] + ); + // existing start_slot + assert_eq!( + AncestorIterator::new_inclusive(2, &blockstore).collect::>(), + vec![2, 1, 0] + ); + + // non-existing start_slot + assert_eq!( + AncestorIterator::new_inclusive(3, &blockstore).collect::>(), + vec![] as Vec + ); + } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index f6fecd33e..b7a944aa8 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -9,7 +9,7 @@ use solana_client::{ use solana_core::{ broadcast_stage::BroadcastStageType, cluster_info::VALIDATOR_PORT_RANGE, - consensus::{SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, + consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, gossip_service::discover_cluster, validator::ValidatorConfig, }; @@ -1370,18 +1370,19 @@ fn test_no_voting() { } #[test] -fn test_optimistic_confirmation_violation() { +#[serial] +fn test_optimistic_confirmation_violation_with_no_tower() { solana_logger::setup(); let mut buf = BufferRedirect::stderr().unwrap(); // First set up the cluster with 2 nodes let slots_per_epoch = 2048; - let node_stakes = vec![50, 51]; + let node_stakes = vec![51, 50]; let validator_keys: Vec<_> = iter::repeat_with(|| (Arc::new(Keypair::new()), true)) .take(node_stakes.len()) .collect(); let config = ClusterConfig { cluster_lamports: 100_000, - node_stakes: vec![51, 50], + node_stakes: node_stakes.clone(), validator_configs: vec![ValidatorConfig::default(); node_stakes.len()], validator_keys: Some(validator_keys), slots_per_epoch, @@ -1415,7 +1416,9 @@ fn test_optimistic_confirmation_violation() { // Mark fork as dead on the heavier validator, this should make the fork effectively // dead, even though it was optimistically confirmed. The smaller validator should - // jump over to the new fork + // create and jump over to a new fork + // Also, remove saved tower to intentionally make the restarted validator to violate the + // optimistic confirmation { let blockstore = Blockstore::open_with_access_type( &exited_validator_info.info.ledger_path, @@ -1433,6 +1436,12 @@ fn test_optimistic_confirmation_violation() { prev_voted_slot ); blockstore.set_dead_slot(prev_voted_slot).unwrap(); + + std::fs::remove_file(Tower::get_filename( + &exited_validator_info.info.ledger_path, + &entry_point_id, + )) + .unwrap(); } cluster.restart_node(&entry_point_id, exited_validator_info); @@ -1465,6 +1474,220 @@ fn test_optimistic_confirmation_violation() { assert!(output.contains(&expected_log)); } +#[test] +#[serial] +#[ignore] +fn test_no_optimistic_confirmation_violation_with_tower() { + solana_logger::setup(); + let mut buf = BufferRedirect::stderr().unwrap(); + + // First set up the cluster with 2 nodes + let slots_per_epoch = 2048; + let node_stakes = vec![51, 50]; + let validator_keys: Vec<_> = iter::repeat_with(|| (Arc::new(Keypair::new()), true)) + .take(node_stakes.len()) + .collect(); + let config = ClusterConfig { + cluster_lamports: 100_000, + node_stakes: node_stakes.clone(), + validator_configs: vec![ValidatorConfig::default(); node_stakes.len()], + validator_keys: Some(validator_keys), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&config); + let entry_point_id = cluster.entry_point_info.id; + // Let the nodes run for a while. Wait for validators to vote on slot `S` + // so that the vote on `S-1` is definitely in gossip and optimistic confirmation is + // detected on slot `S-1` for sure, then stop the heavier of the two + // validators + let client = cluster.get_validator_client(&entry_point_id).unwrap(); + let mut prev_voted_slot = 0; + loop { + let last_voted_slot = client + .get_slot_with_commitment(CommitmentConfig::recent()) + .unwrap(); + if last_voted_slot > 50 { + if prev_voted_slot == 0 { + prev_voted_slot = last_voted_slot; + } else { + break; + } + } + sleep(Duration::from_millis(100)); + } + + let exited_validator_info = cluster.exit_node(&entry_point_id); + + // Mark fork as dead on the heavier validator, this should make the fork effectively + // dead, even though it was optimistically confirmed. The smaller validator should + // create and jump over to a new fork + { + let blockstore = Blockstore::open_with_access_type( + &exited_validator_info.info.ledger_path, + AccessType::PrimaryOnly, + None, + ) + .unwrap_or_else(|e| { + panic!( + "Failed to open ledger at {:?}, err: {}", + exited_validator_info.info.ledger_path, e + ); + }); + info!( + "Setting slot: {} on main fork as dead, should cause fork", + prev_voted_slot + ); + blockstore.set_dead_slot(prev_voted_slot).unwrap(); + } + cluster.restart_node(&entry_point_id, exited_validator_info); + + cluster.check_no_new_roots(400, "test_no_optimistic_confirmation_violation_with_tower"); + + // Check to see that validator didn't detected optimistic confirmation for + // `prev_voted_slot` failed + let expected_log = format!("Optimistic slot {} was not rooted", prev_voted_slot); + let mut output = String::new(); + buf.read_to_string(&mut output).unwrap(); + assert!(!output.contains(&expected_log)); +} + +#[test] +#[serial] +fn test_validator_saves_tower() { + solana_logger::setup(); + + let validator_config = ValidatorConfig { + require_tower: true, + ..ValidatorConfig::default() + }; + let validator_identity_keypair = Arc::new(Keypair::new()); + let validator_id = validator_identity_keypair.pubkey(); + let config = ClusterConfig { + cluster_lamports: 10_000, + node_stakes: vec![100], + validator_configs: vec![validator_config], + validator_keys: Some(vec![(validator_identity_keypair.clone(), true)]), + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&config); + + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + let ledger_path = cluster + .validators + .get(&validator_id) + .unwrap() + .info + .ledger_path + .clone(); + + // Wait for some votes to be generated + let mut last_replayed_root; + loop { + if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::recent()) { + trace!("current slot: {}", slot); + if slot > 2 { + // this will be the root next time a validator starts + last_replayed_root = slot; + break; + } + } + sleep(Duration::from_millis(10)); + } + + // Stop validator and check saved tower + let validator_info = cluster.exit_node(&validator_id); + let tower1 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower1: {:?}", tower1); + assert_eq!(tower1.root(), Some(0)); + + // Restart the validator and wait for a new root + cluster.restart_node(&validator_id, validator_info); + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + // Wait for the first root + loop { + if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + trace!("current root: {}", root); + if root > last_replayed_root + 1 { + last_replayed_root = root; + break; + } + } + sleep(Duration::from_millis(50)); + } + + // Stop validator, and check saved tower + let recent_slot = validator_client + .get_slot_with_commitment(CommitmentConfig::recent()) + .unwrap(); + let validator_info = cluster.exit_node(&validator_id); + let tower2 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower2: {:?}", tower2); + assert_eq!(tower2.root(), Some(last_replayed_root)); + last_replayed_root = recent_slot; + + // Rollback saved tower to `tower1` to simulate a validator starting from a newer snapshot + // without having to wait for that snapshot to be generated in this test + tower1.save(&validator_identity_keypair).unwrap(); + + cluster.restart_node(&validator_id, validator_info); + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + // Wait for a new root, demonstrating the validator was able to make progress from the older `tower1` + loop { + if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + trace!( + "current root: {}, last_replayed_root: {}", + root, + last_replayed_root + ); + if root > last_replayed_root { + break; + } + } + sleep(Duration::from_millis(50)); + } + + // Check the new root is reflected in the saved tower state + let mut validator_info = cluster.exit_node(&validator_id); + let tower3 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower3: {:?}", tower3); + assert!(tower3.root().unwrap() > last_replayed_root); + + // Remove the tower file entirely and allow the validator to start without a tower. It will + // rebuild tower from its vote account contents + fs::remove_file(Tower::get_filename(&ledger_path, &validator_id)).unwrap(); + validator_info.config.require_tower = false; + + cluster.restart_node(&validator_id, validator_info); + let validator_client = cluster.get_validator_client(&validator_id).unwrap(); + + // Wait for a couple more slots to pass so another vote occurs + let current_slot = validator_client + .get_slot_with_commitment(CommitmentConfig::recent()) + .unwrap(); + loop { + if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::recent()) { + trace!("current_slot: {}, slot: {}", current_slot, slot); + if slot > current_slot + 1 { + break; + } + } + sleep(Duration::from_millis(50)); + } + + cluster.close_preserve_ledgers(); + + let tower4 = Tower::restore(&ledger_path, &validator_id).unwrap(); + trace!("tower4: {:?}", tower4); + // should tower4 advance 1 slot compared to tower3???? + assert_eq!(tower4.root(), tower3.root().map(|s| s + 1)); +} + fn wait_for_next_snapshot( cluster: &LocalCluster, snapshot_package_output_path: &Path, diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index bb7b007c8..87ce93eae 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -93,6 +93,7 @@ ledger_dir="$SOLANA_CONFIG_DIR"/bootstrap-validator args+=( --enable-rpc-exit --enable-rpc-set-log-filter + --require-tower --ledger "$ledger_dir" --rpc-port 8899 --snapshot-interval-slots 200 diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 2cebf2a7f..b95c4f1c1 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -228,6 +228,7 @@ default_arg --ledger "$ledger_dir" default_arg --log - default_arg --enable-rpc-exit default_arg --enable-rpc-set-log-filter +default_arg --require-tower if [[ -n $SOLANA_CUDA ]]; then program=$solana_validator_cuda diff --git a/programs/vote/src/vote_state/mod.rs b/programs/vote/src/vote_state/mod.rs index eeb9fa841..bf2cc92c9 100644 --- a/programs/vote/src/vote_state/mod.rs +++ b/programs/vote/src/vote_state/mod.rs @@ -163,6 +163,9 @@ pub struct VoteState { pub commission: u8, pub votes: VecDeque, + + // This usually the last Lockout which was popped from self.votes. + // However, it can be arbitrary slot, when being used inside Tower pub root_slot: Option, /// the signer for vote transactions diff --git a/run.sh b/run.sh index 74deb4b35..3d2442255 100755 --- a/run.sh +++ b/run.sh @@ -104,6 +104,7 @@ args=( --enable-rpc-exit --enable-rpc-transaction-history --init-complete-file "$dataDir"/init-completed + --require-tower ) solana-validator "${args[@]}" & validator=$! diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 9ed39cb95..eb783ece0 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -913,6 +913,10 @@ impl Bank { }); } + pub fn get_slot_history(&self) -> SlotHistory { + SlotHistory::from_account(&self.get_account(&sysvar::slot_history::id()).unwrap()).unwrap() + } + fn update_epoch_stakes(&mut self, leader_schedule_epoch: Epoch) { // update epoch_stakes cache // if my parent didn't populate for this staker's epoch, we've diff --git a/sdk/src/slot_history.rs b/sdk/src/slot_history.rs index 4765fab70..293ae38e1 100644 --- a/sdk/src/slot_history.rs +++ b/sdk/src/slot_history.rs @@ -63,7 +63,7 @@ impl SlotHistory { } pub fn check(&self, slot: Slot) -> Check { - if slot >= self.next_slot { + if slot > self.newest() { Check::Future } else if slot < self.oldest() { Check::TooOld @@ -77,6 +77,10 @@ impl SlotHistory { pub fn oldest(&self) -> Slot { self.next_slot.saturating_sub(MAX_ENTRIES) } + + pub fn newest(&self) -> Slot { + self.next_slot - 1 + } } #[cfg(test)] diff --git a/validator/src/main.rs b/validator/src/main.rs index 91bee0079..b6fa88b40 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -793,6 +793,12 @@ pub fn main() { .takes_value(false) .help("Use CUDA"), ) + .arg( + clap::Arg::with_name("require_tower") + .long("require-tower") + .takes_value(false) + .help("Refuse to start if saved tower state is not found"), + ) .arg( Arg::with_name("expected_genesis_hash") .long("expected-genesis-hash") @@ -1015,6 +1021,7 @@ pub fn main() { let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode"); let mut validator_config = ValidatorConfig { + require_tower: matches.is_present("require_tower"), dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), expected_genesis_hash: matches .value_of("expected_genesis_hash")