diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 8c2965faf7..92456ae1c9 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,9 +1,10 @@ +use crate::tower1_7_14::Tower1_7_14; use { crate::{ heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, progress_map::{LockoutIntervals, ProgressMap}, - tower_storage::{SavedTower, TowerStorage}, + tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, }, chrono::prelude::*, solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db}, @@ -13,6 +14,7 @@ use { }, solana_sdk::{ clock::{Slot, UnixTimestamp}, + feature_set, hash::Hash, instruction::Instruction, pubkey::Pubkey, @@ -21,7 +23,10 @@ use { }, solana_vote_program::{ vote_instruction, - vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY}, + vote_state::{ + BlockTimestamp, Lockout, Vote, VoteState, VoteStateUpdate, VoteTransaction, + MAX_LOCKOUT_HISTORY, + }, }, std::{ cmp::Ordering, @@ -45,29 +50,43 @@ pub enum SwitchForkDecision { impl SwitchForkDecision { pub fn to_vote_instruction( &self, - vote: Vote, + vote: VoteTransaction, vote_account_pubkey: &Pubkey, authorized_voter_pubkey: &Pubkey, ) -> Option { - match self { - SwitchForkDecision::FailedSwitchThreshold(_, total_stake) => { + match (self, vote) { + (SwitchForkDecision::FailedSwitchThreshold(_, total_stake), _) => { assert_ne!(*total_stake, 0); None } - SwitchForkDecision::FailedSwitchDuplicateRollback(_) => None, - SwitchForkDecision::SameFork => Some(vote_instruction::vote( - vote_account_pubkey, - authorized_voter_pubkey, - vote, - )), - SwitchForkDecision::SwitchProof(switch_proof_hash) => { + (SwitchForkDecision::FailedSwitchDuplicateRollback(_), _) => None, + (SwitchForkDecision::SameFork, VoteTransaction::Vote(v)) => Some( + vote_instruction::vote(vote_account_pubkey, authorized_voter_pubkey, v), + ), + (SwitchForkDecision::SameFork, VoteTransaction::VoteStateUpdate(v)) => { + Some(vote_instruction::update_vote_state( + vote_account_pubkey, + authorized_voter_pubkey, + v, + )) + } + (SwitchForkDecision::SwitchProof(switch_proof_hash), VoteTransaction::Vote(v)) => { Some(vote_instruction::vote_switch( vote_account_pubkey, authorized_voter_pubkey, - vote, + v, *switch_proof_hash, )) } + ( + SwitchForkDecision::SwitchProof(switch_proof_hash), + VoteTransaction::VoteStateUpdate(v), + ) => Some(vote_instruction::update_vote_state_switch( + vote_account_pubkey, + authorized_voter_pubkey, + v, + *switch_proof_hash, + )), } } @@ -102,14 +121,47 @@ pub(crate) struct ComputedBankState { pub my_latest_landed_vote: Option, } -#[frozen_abi(digest = "GMs1FxKteU7K4ZFRofMBqNhBpM4xkPVxfYod6R8DQmpT")] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +pub enum TowerVersions { + V1_17_14(Tower1_7_14), + Current(Tower), +} + +impl TowerVersions { + pub fn new_current(tower: Tower) -> Self { + Self::Current(tower) + } + + pub fn convert_to_current(self) -> Tower { + match self { + TowerVersions::V1_17_14(tower) => { + let box_last_vote = VoteTransaction::from(tower.last_vote.clone()); + + Tower { + node_pubkey: tower.node_pubkey, + threshold_depth: tower.threshold_depth, + threshold_size: tower.threshold_size, + vote_state: tower.vote_state, + last_vote: box_last_vote, + last_vote_tx_blockhash: tower.last_vote_tx_blockhash, + last_timestamp: tower.last_timestamp, + stray_restored_slot: tower.stray_restored_slot, + last_switch_threshold_check: tower.last_switch_threshold_check, + } + } + TowerVersions::Current(tower) => tower, + } + } +} + +#[frozen_abi(digest = "BfeSJNsfQeX6JU7dmezv1s1aSvR5SoyxKRRZ4ubTh2mt")] #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] pub struct Tower { pub node_pubkey: Pubkey, threshold_depth: usize, threshold_size: f64, - vote_state: VoteState, - last_vote: Vote, + pub(crate) vote_state: VoteState, + last_vote: VoteTransaction, #[serde(skip)] // The blockhash used in the last vote transaction, may or may not equal the // blockhash of the voted block itself, depending if the vote slot was refreshed. @@ -136,7 +188,7 @@ impl Default for Tower { threshold_depth: VOTE_THRESHOLD_DEPTH, threshold_size: VOTE_THRESHOLD_SIZE, vote_state: VoteState::default(), - last_vote: Vote::default(), + last_vote: VoteTransaction::from(VoteStateUpdate::default()), last_timestamp: BlockTimestamp::default(), last_vote_tx_blockhash: Hash::default(), stray_restored_slot: Option::default(), @@ -359,12 +411,18 @@ impl Tower { self.last_vote_tx_blockhash = new_vote_tx_blockhash; } + // Returns true if we have switched the new vote instruction that directly sets vote state + pub(crate) fn is_direct_vote_state_update_enabled(bank: &Bank) -> bool { + bank.feature_set + .is_active(&feature_set::allow_votes_to_directly_update_vote_state::id()) + } + fn apply_vote_and_generate_vote_diff( local_vote_state: &mut VoteState, slot: Slot, hash: Hash, last_voted_slot_in_bank: Option, - ) -> Vote { + ) -> VoteTransaction { let vote = Vote::new(vec![slot], hash); local_vote_state.process_vote_unchecked(vote); let slots = if let Some(last_voted_slot) = last_voted_slot_in_bank { @@ -377,7 +435,7 @@ impl Tower { } else { local_vote_state.votes.iter().map(|v| v.slot).collect() }; - Vote::new(slots, hash) + VoteTransaction::from(Vote::new(slots, hash)) } pub fn last_voted_slot_in_bank(bank: &Bank, vote_account_pubkey: &Pubkey) -> Option { @@ -391,7 +449,12 @@ impl Tower { // Returns the new root if one is made after applying a vote for the given bank to // `self.vote_state` - self.record_bank_vote_and_update_lockouts(bank.slot(), bank.hash(), last_voted_slot_in_bank) + self.record_bank_vote_and_update_lockouts( + bank.slot(), + bank.hash(), + last_voted_slot_in_bank, + Self::is_direct_vote_state_update_enabled(bank), + ) } fn record_bank_vote_and_update_lockouts( @@ -399,18 +462,29 @@ impl Tower { vote_slot: Slot, vote_hash: Hash, last_voted_slot_in_bank: Option, + is_direct_vote_state_update_enabled: bool, ) -> Option { trace!("{} record_vote for {}", self.node_pubkey, vote_slot); let old_root = self.root(); - let mut new_vote = Self::apply_vote_and_generate_vote_diff( - &mut self.vote_state, - vote_slot, - vote_hash, - last_voted_slot_in_bank, - ); - new_vote.timestamp = - self.maybe_timestamp(self.last_vote.slots.last().copied().unwrap_or_default()); + let mut new_vote = if is_direct_vote_state_update_enabled { + let vote = Vote::new(vec![vote_slot], vote_hash); + self.vote_state.process_vote_unchecked(vote); + VoteTransaction::from(VoteStateUpdate::new( + self.vote_state.votes.clone(), + self.vote_state.root_slot, + vote_hash, + )) + } else { + Self::apply_vote_and_generate_vote_diff( + &mut self.vote_state, + vote_slot, + vote_hash, + last_voted_slot_in_bank, + ) + }; + + new_vote.set_timestamp(self.maybe_timestamp(self.last_voted_slot().unwrap_or_default())); self.last_vote = new_vote; let new_root = self.root(); @@ -429,7 +503,7 @@ impl Tower { #[cfg(test)] pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option { - self.record_bank_vote_and_update_lockouts(slot, hash, self.last_voted_slot()) + self.record_bank_vote_and_update_lockouts(slot, hash, self.last_voted_slot(), true) } /// Used for tests @@ -440,18 +514,22 @@ impl Tower { } pub fn last_voted_slot(&self) -> Option { - self.last_vote.slots.last().copied() + if self.last_vote.is_empty() { + None + } else { + Some(self.last_vote.slot(self.last_vote.len() - 1)) + } } pub fn last_voted_slot_hash(&self) -> Option<(Slot, Hash)> { - Some((*self.last_vote.slots.last()?, self.last_vote.hash)) + Some((self.last_voted_slot()?, self.last_vote.hash())) } pub fn stray_restored_slot(&self) -> Option { self.stray_restored_slot } - pub fn last_vote(&self) -> Vote { + pub fn last_vote(&self) -> VoteTransaction { self.last_vote.clone() } @@ -482,12 +560,17 @@ impl Tower { self.vote_state.root_slot.unwrap() } - // a slot is recent if it's newer than the last vote we have + // a slot is recent if it's newer than the last vote we have. If we haven't voted yet + // but have a root (hard forks situation) then comparre it to the root pub fn is_recent(&self, slot: Slot) -> bool { if let Some(last_voted_slot) = self.vote_state.last_voted_slot() { if slot <= last_voted_slot { return false; } + } else if let Some(root) = self.vote_state.root_slot { + if slot <= root { + return false; + } } true } @@ -583,9 +666,9 @@ impl Tower { // `switch < last` is needed not to warn! this message just because of using // newer snapshots on validator restart let message = format!( - "bank_forks doesn't have corresponding data for the stray restored \ + "bank_forks doesn't have corresponding data for the stray restored \ last vote({}), meaning some inconsistency between saved tower and ledger.", - last_voted_slot + last_voted_slot ); warn!("{}", message); datapoint_warn!("tower_warn", ("warn", message, String)); @@ -627,42 +710,56 @@ impl Tower { // TODO: Handle if the last vote is on a dupe, and then we restart. The dupe won't be in // heaviest_subtree_fork_choice, so `heaviest_subtree_fork_choice.latest_invalid_ancestor()` will return // None, but the last vote will be persisted in tower. - let switch_hash = progress.get_hash(switch_slot).expect("Slot we're trying to switch to must exist AND be frozen in progress map"); - if let Some(latest_duplicate_ancestor) = heaviest_subtree_fork_choice.latest_invalid_ancestor(&(last_voted_slot, last_voted_hash)) { + let switch_hash = progress + .get_hash(switch_slot) + .expect("Slot we're trying to switch to must exist AND be frozen in progress map"); + if let Some(latest_duplicate_ancestor) = heaviest_subtree_fork_choice + .latest_invalid_ancestor(&(last_voted_slot, last_voted_hash)) + { // We're rolling back because one of the ancestors of the last vote was a duplicate. In this // case, it's acceptable if the switch candidate is one of ancestors of the previous vote, // just fail the switch check because there's no point in voting on an ancestor. ReplayStage // should then have a special case continue building an alternate fork from this ancestor, NOT // the `last_voted_slot`. This is in contrast to usual SwitchFailure where ReplayStage continues to build blocks // on latest vote. See `ReplayStage::select_vote_and_reset_forks()` for more details. - if heaviest_subtree_fork_choice.is_strict_ancestor(&(switch_slot, switch_hash), &(last_voted_slot, last_voted_hash)) { + if heaviest_subtree_fork_choice.is_strict_ancestor( + &(switch_slot, switch_hash), + &(last_voted_slot, last_voted_hash), + ) { return rollback_due_to_to_to_duplicate_ancestor(latest_duplicate_ancestor); - } else if progress.get_hash(last_voted_slot).map(|current_slot_hash| current_slot_hash != last_voted_hash).unwrap_or(true) { + } else if progress + .get_hash(last_voted_slot) + .map(|current_slot_hash| current_slot_hash != last_voted_hash) + .unwrap_or(true) + { // Our last vote slot was purged because it was on a duplicate fork, don't continue below // where checks may panic. We allow a freebie vote here that may violate switching // thresholds // TODO: Properly handle this case - info!("Allowing switch vote on {:?} because last vote {:?} was rolled back", (switch_slot, switch_hash), (last_voted_slot, last_voted_hash)); + info!( + "Allowing switch vote on {:?} because last vote {:?} was rolled back", + (switch_slot, switch_hash), + (last_voted_slot, last_voted_hash) + ); return SwitchForkDecision::SwitchProof(Hash::default()); } } - let last_vote_ancestors = - ancestors.get(&last_voted_slot).unwrap_or_else(|| { - if self.is_stray_last_vote() { - // Unless last vote is stray and stale, ancestors.get(last_voted_slot) must - // return Some(_), justifying to panic! here. - // Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None, - // if all saved votes are ancestors of replayed_root_slot. So this code shouldn't be - // touched in that case as well. - // In other words, except being stray, all other slots have been voted on while - // this validator has been running, so we must be able to fetch ancestors for - // all of them. - empty_ancestors_due_to_minor_unsynced_ledger() - } else { - panic!("no ancestors found with slot: {}", last_voted_slot); - } - }); + let last_vote_ancestors = ancestors.get(&last_voted_slot).unwrap_or_else(|| { + if self.is_stray_last_vote() { + // Unless last vote is stray and stale, ancestors.get(last_voted_slot) must + // return Some(_), justifying to panic! here. + // Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None, + // if all saved votes are ancestors of replayed_root_slot. So this code shouldn't be + // touched in that case as well. + // In other words, except being stray, all other slots have been voted on while + // this validator has been running, so we must be able to fetch ancestors for + // all of them. + empty_ancestors_due_to_minor_unsynced_ledger() + } else { + panic!("no ancestors found with slot: {}", last_voted_slot); + } + }); let switch_slot_ancestors = ancestors.get(&switch_slot).unwrap(); @@ -734,22 +831,76 @@ impl Tower { // Find any locked out intervals for vote accounts in this bank with // `lockout_interval_end` >= `last_vote`, which implies they are locked out at // `last_vote` on another fork. - for (_lockout_interval_end, intervals_keyed_by_end) in lockout_intervals.range((Included(last_voted_slot), Unbounded)) { - for (lockout_interval_start, vote_account_pubkey) in intervals_keyed_by_end { - if locked_out_vote_accounts.contains(vote_account_pubkey) { - continue; - } + for (_lockout_interval_end, intervals_keyed_by_end) in + lockout_intervals.range((Included(last_voted_slot), Unbounded)) + { + for (lockout_interval_start, vote_account_pubkey) in intervals_keyed_by_end { + if locked_out_vote_accounts.contains(vote_account_pubkey) { + continue; + } - // Only count lockouts on slots that are: - // 1) Not ancestors of `last_vote`, meaning being on different fork - // 2) Not from before the current root as we can't determine if - // anything before the root was an ancestor of `last_vote` or not - if !last_vote_ancestors.contains(lockout_interval_start) - // Given a `lockout_interval_start` < root that appears in a - // bank for a `candidate_slot`, it must be that `lockout_interval_start` - // is an ancestor of the current root, because `candidate_slot` is a - // descendant of the current root - && *lockout_interval_start > root + // Only count lockouts on slots that are: + // 1) Not ancestors of `last_vote`, meaning being on different fork + // 2) Not from before the current root as we can't determine if + // anything before the root was an ancestor of `last_vote` or not + if !last_vote_ancestors.contains(lockout_interval_start) + // Given a `lockout_interval_start` < root that appears in a + // bank for a `candidate_slot`, it must be that `lockout_interval_start` + // is an ancestor of the current root, because `candidate_slot` is a + // descendant of the current root + && *lockout_interval_start > root + { + let stake = epoch_vote_accounts + .get(vote_account_pubkey) + .map(|(stake, _)| *stake) + .unwrap_or(0); + locked_out_stake += stake; + if (locked_out_stake as f64 / total_stake as f64) + > SWITCH_FORK_THRESHOLD + { + return SwitchForkDecision::SwitchProof(switch_proof); + } + locked_out_vote_accounts.insert(vote_account_pubkey); + } + } + } + } + + // Check the latest votes for potentially gossip votes that haven't landed yet + for ( + vote_account_pubkey, + (candidate_latest_frozen_vote, _candidate_latest_frozen_vote_hash), + ) in latest_validator_votes_for_frozen_banks.max_gossip_frozen_votes() + { + if locked_out_vote_accounts.contains(&vote_account_pubkey) { + continue; + } + + if *candidate_latest_frozen_vote > last_voted_slot + && + // Because `candidate_latest_frozen_vote` is the last vote made by some validator + // in the cluster for a frozen bank `B` observed through gossip, we may have cleared + // that frozen bank `B` because we `set_root(root)` for a `root` on a different fork, + // like so: + // + // |----------X ------candidate_latest_frozen_vote (frozen) + // old root + // |----------new root ----last_voted_slot + // + // In most cases, because `last_voted_slot` must be a descendant of `root`, then + // if `candidate_latest_frozen_vote` is not found in the ancestors/descendants map (recall these + // directly reflect the state of BankForks), this implies that `B` was pruned from BankForks + // because it was on a different fork than `last_voted_slot`, and thus this vote for `candidate_latest_frozen_vote` + // should be safe to count towards the switching proof: + // + // However, there is also the possibility that `last_voted_slot` is a stray, in which + // case we cannot make this conclusion as we do not know the ancestors/descendants + // of strays. Hence we err on the side of caution here and ignore this vote. This + // is ok because validators voting on different unrooted forks should eventually vote + // on some descendant of the root, at which time they can be included in switching proofs. + !Self::is_candidate_slot_descendant_of_last_vote( + *candidate_latest_frozen_vote, last_voted_slot, ancestors) + .unwrap_or(true) { let stake = epoch_vote_accounts .get(vote_account_pubkey) @@ -761,58 +912,13 @@ impl Tower { } locked_out_vote_accounts.insert(vote_account_pubkey); } - } - } - } - - // Check the latest votes for potentially gossip votes that haven't landed yet - for (vote_account_pubkey, (candidate_latest_frozen_vote, _candidate_latest_frozen_vote_hash)) in latest_validator_votes_for_frozen_banks.max_gossip_frozen_votes() { - if locked_out_vote_accounts.contains(&vote_account_pubkey) { - continue; - } - - if *candidate_latest_frozen_vote > last_voted_slot - && - // Because `candidate_latest_frozen_vote` is the last vote made by some validator - // in the cluster for a frozen bank `B` observed through gossip, we may have cleared - // that frozen bank `B` because we `set_root(root)` for a `root` on a different fork, - // like so: - // - // |----------X ------candidate_latest_frozen_vote (frozen) - // old root - // |----------new root ----last_voted_slot - // - // In most cases, because `last_voted_slot` must be a descendant of `root`, then - // if `candidate_latest_frozen_vote` is not found in the ancestors/descendants map (recall these - // directly reflect the state of BankForks), this implies that `B` was pruned from BankForks - // because it was on a different fork than `last_voted_slot`, and thus this vote for `candidate_latest_frozen_vote` - // should be safe to count towards the switching proof: - // - // However, there is also the possibility that `last_voted_slot` is a stray, in which - // case we cannot make this conclusion as we do not know the ancestors/descendants - // of strays. Hence we err on the side of caution here and ignore this vote. This - // is ok because validators voting on different unrooted forks should eventually vote - // on some descendant of the root, at which time they can be included in switching proofs. - !Self::is_candidate_slot_descendant_of_last_vote( - *candidate_latest_frozen_vote, last_voted_slot, ancestors) - .unwrap_or(true) { - let stake = epoch_vote_accounts - .get(vote_account_pubkey) - .map(|(stake, _)| *stake) - .unwrap_or(0); - locked_out_stake += stake; - if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD { - return SwitchForkDecision::SwitchProof(switch_proof); - } - locked_out_vote_accounts.insert(vote_account_pubkey); - } } // We have not detected sufficient lockout past the last voted slot to generate // a switching proof SwitchForkDecision::FailedSwitchThreshold(locked_out_stake, total_stake) }) - .unwrap_or(SwitchForkDecision::SameFork) + .unwrap_or(SwitchForkDecision::SameFork) } #[allow(clippy::too_many_arguments)] @@ -933,13 +1039,7 @@ impl Tower { } pub fn is_stray_last_vote(&self) -> bool { - if let Some(last_voted_slot) = self.last_voted_slot() { - if let Some(stray_restored_slot) = self.stray_restored_slot { - return stray_restored_slot == last_voted_slot; - } - } - - false + self.stray_restored_slot == self.last_voted_slot() } // The tower root can be older/newer if the validator booted from a newer/older snapshot, so @@ -961,8 +1061,10 @@ impl Tower { assert_eq!(slot_history.check(replayed_root), Check::Found); assert!( - self.last_vote == Vote::default() && self.vote_state.votes.is_empty() - || self.last_vote != Vote::default() && !self.vote_state.votes.is_empty(), + self.last_vote == VoteTransaction::from(VoteStateUpdate::default()) + && self.vote_state.votes.is_empty() + || self.last_vote != VoteTransaction::from(VoteStateUpdate::default()) + && !self.vote_state.votes.is_empty(), "last vote: {:?} vote_state.votes: {:?}", self.last_vote, self.vote_state.votes @@ -1116,7 +1218,7 @@ impl Tower { info!("All restored votes were behind; resetting root_slot and last_vote in tower!"); // we might not have banks for those votes so just reset. // That's because the votes may well past replayed_root - self.last_vote = Vote::default(); + self.last_vote = VoteTransaction::from(Vote::default()); } else { info!( "{} restored votes (out of {}) were on different fork or are upcoming votes on unrooted slots: {:?}!", @@ -1125,11 +1227,8 @@ impl Tower { self.voted_slots() ); - assert_eq!( - self.last_vote.slots.last().unwrap(), - self.voted_slots().last().unwrap() - ); - self.stray_restored_slot = Some(*self.last_vote.slots.last().unwrap()); + assert_eq!(self.last_voted_slot(), self.voted_slots().last().copied()); + self.stray_restored_slot = self.last_vote.last_voted_slot() } Ok(()) @@ -1176,13 +1275,12 @@ impl Tower { pub fn save(&self, tower_storage: &dyn TowerStorage, node_keypair: &Keypair) -> Result<()> { let saved_tower = SavedTower::new(self, node_keypair)?; - tower_storage.store(&saved_tower)?; + tower_storage.store(&SavedTowerVersions::from(saved_tower))?; Ok(()) } pub fn restore(tower_storage: &dyn TowerStorage, node_pubkey: &Pubkey) -> Result { - let saved_tower = tower_storage.load(node_pubkey)?; - saved_tower.try_into_tower(node_pubkey) + tower_storage.load(node_pubkey) } } @@ -1290,7 +1388,7 @@ pub mod test { }, solana_vote_program::vote_state::{Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY}, std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, fs::{remove_file, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, path::PathBuf, @@ -1331,17 +1429,29 @@ pub mod test { let vote = Vote::default(); let mut decision = SwitchForkDecision::FailedSwitchThreshold(0, 1); assert!(decision - .to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()) + .to_vote_instruction( + VoteTransaction::from(vote.clone()), + &Pubkey::default(), + &Pubkey::default() + ) .is_none()); decision = SwitchForkDecision::FailedSwitchDuplicateRollback(0); assert!(decision - .to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()) + .to_vote_instruction( + VoteTransaction::from(vote.clone()), + &Pubkey::default(), + &Pubkey::default() + ) .is_none()); decision = SwitchForkDecision::SameFork; assert_eq!( - decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()), + decision.to_vote_instruction( + VoteTransaction::from(vote.clone()), + &Pubkey::default(), + &Pubkey::default() + ), Some(vote_instruction::vote( &Pubkey::default(), &Pubkey::default(), @@ -1351,7 +1461,11 @@ pub mod test { decision = SwitchForkDecision::SwitchProof(Hash::default()); assert_eq!( - decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()), + decision.to_vote_instruction( + VoteTransaction::from(vote.clone()), + &Pubkey::default(), + &Pubkey::default() + ), Some(vote_instruction::vote_switch( &Pubkey::default(), &Pubkey::default(), @@ -1373,7 +1487,7 @@ pub mod test { // Set the voting behavior let mut cluster_votes = HashMap::new(); - let votes = vec![0, 1, 2, 3, 4, 5]; + let votes = vec![1, 2, 3, 4, 5]; cluster_votes.insert(node_pubkey, votes.clone()); vote_simulator.fill_bank_forks(forks, &cluster_votes, true); @@ -1384,9 +1498,12 @@ pub mod test { .is_empty()); } - for i in 0..5 { - assert_eq!(tower.vote_state.votes[i].slot as usize, i); - assert_eq!(tower.vote_state.votes[i].confirmation_count as usize, 6 - i); + for i in 1..5 { + assert_eq!(tower.vote_state.votes[i - 1].slot as usize, i); + assert_eq!( + tower.vote_state.votes[i - 1].confirmation_count as usize, + 6 - i + ); } } @@ -1846,7 +1963,7 @@ pub mod test { / (tr(44) // Minor fork 2 / (tr(45) / (tr(46)))) - / (tr(110))))); + / (tr(110))))); // Have two validators, each representing 20% of the stake vote on // minor fork 2 at slots 46 + 47 @@ -1918,7 +2035,7 @@ pub mod test { let mut my_votes: Vec = vec![]; let next_unlocked_slot = 110; // Vote on the first minor fork - my_votes.extend(0..=14); + my_votes.extend(1..=14); // Come back to the main fork my_votes.extend(43..=44); // Vote on the second minor fork @@ -2107,8 +2224,8 @@ pub mod test { #[test] fn test_is_locked_out_empty() { let tower = Tower::new_for_tests(0, 0.67); - let ancestors = HashSet::new(); - assert!(!tower.is_locked_out(0, &ancestors)); + let ancestors = HashSet::from([0]); + assert!(!tower.is_locked_out(1, &ancestors)); } #[test] @@ -2139,7 +2256,7 @@ pub mod test { #[test] fn test_check_recent_slot() { let mut tower = Tower::new_for_tests(0, 0.67); - assert!(tower.is_recent(0)); + assert!(tower.is_recent(1)); assert!(tower.is_recent(32)); for i in 0..64 { tower.record_vote(i, Hash::default()); @@ -2254,7 +2371,7 @@ pub mod test { let mut local = VoteState::default(); let vote = Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), None); assert_eq!(local.votes.len(), 1); - assert_eq!(vote.slots, vec![0]); + assert_eq!(vote.slots(), vec![0]); assert_eq!(local.tower(), vec![0]); } @@ -2265,7 +2382,7 @@ pub mod test { // another vote for slot 0 should return an empty vote as the diff. let vote = Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), Some(0)); - assert!(vote.slots.is_empty()); + assert!(vote.is_empty()); } #[test] @@ -2280,7 +2397,7 @@ pub mod test { assert_eq!(local.votes.len(), 1); let vote = Tower::apply_vote_and_generate_vote_diff(&mut local, 1, Hash::default(), Some(0)); - assert_eq!(vote.slots, vec![1]); + assert_eq!(vote.slots(), vec![1]); assert_eq!(local.tower(), vec![0, 1]); } @@ -2300,7 +2417,7 @@ pub mod test { // observable in any of the results. let vote = Tower::apply_vote_and_generate_vote_diff(&mut local, 3, Hash::default(), Some(0)); - assert_eq!(vote.slots, vec![3]); + assert_eq!(vote.slots(), vec![3]); assert_eq!(local.tower(), vec![3]); } @@ -2373,17 +2490,26 @@ pub mod test { fn vote_and_check_recent(num_votes: usize) { let mut tower = Tower::new_for_tests(1, 0.67); let slots = if num_votes > 0 { - vec![num_votes as u64 - 1] + { 0..num_votes } + .map(|i| Lockout { + slot: i as u64, + confirmation_count: (num_votes as u32) - (i as u32), + }) + .collect() } else { vec![] }; - let mut expected = Vote::new(slots, Hash::default()); + let mut expected = VoteStateUpdate::new( + VecDeque::from(slots), + if num_votes > 0 { Some(0) } else { None }, + Hash::default(), + ); for i in 0..num_votes { tower.record_vote(i as u64, Hash::default()); } - expected.timestamp = tower.last_vote.timestamp; - assert_eq!(expected, tower.last_vote) + expected.timestamp = tower.last_vote.timestamp(); + assert_eq!(VoteTransaction::from(expected), tower.last_vote) } #[test] @@ -2683,10 +2809,12 @@ pub mod test { .write(true) .open(path) .unwrap(); + // 4 is the offset into SavedTowerVersions for the signature + assert_eq!(file.seek(SeekFrom::Start(4)).unwrap(), 4); let mut buf = [0u8]; assert_eq!(file.read(&mut buf).unwrap(), 1); buf[0] = !buf[0]; - assert_eq!(file.seek(SeekFrom::Start(0)).unwrap(), 0); + assert_eq!(file.seek(SeekFrom::Start(4)).unwrap(), 4); assert_eq!(file.write(&buf).unwrap(), 1); }, ); @@ -3025,7 +3153,7 @@ pub mod test { tower.vote_state.votes.push_back(Lockout::new(1)); tower.vote_state.votes.push_back(Lockout::new(0)); let vote = Vote::new(vec![0], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); let mut slot_history = SlotHistory::default(); slot_history.add(0); @@ -3043,7 +3171,7 @@ pub mod test { tower.vote_state.votes.push_back(Lockout::new(1)); tower.vote_state.votes.push_back(Lockout::new(2)); let vote = Vote::new(vec![2], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); let mut slot_history = SlotHistory::default(); slot_history.add(0); @@ -3068,7 +3196,7 @@ pub mod test { tower.vote_state.votes.push_back(Lockout::new(0)); tower.vote_state.votes.push_back(Lockout::new(1)); let vote = Vote::new(vec![1], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); let mut slot_history = SlotHistory::default(); slot_history.add(MAX_ENTRIES); @@ -3087,7 +3215,7 @@ pub mod test { tower.vote_state.votes.push_back(Lockout::new(2)); tower.vote_state.votes.push_back(Lockout::new(1)); let vote = Vote::new(vec![1], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); let mut slot_history = SlotHistory::default(); slot_history.add(0); @@ -3106,7 +3234,7 @@ pub mod test { tower.vote_state.votes.push_back(Lockout::new(3)); tower.vote_state.votes.push_back(Lockout::new(3)); let vote = Vote::new(vec![3], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); let mut slot_history = SlotHistory::default(); slot_history.add(0); @@ -3125,7 +3253,7 @@ pub mod test { tower.vote_state.votes.push_back(Lockout::new(43)); tower.vote_state.votes.push_back(Lockout::new(44)); let vote = Vote::new(vec![44], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); let mut slot_history = SlotHistory::default(); slot_history.add(42); @@ -3139,7 +3267,7 @@ pub mod test { let mut tower = Tower::new_for_tests(10, 0.9); tower.vote_state.votes.push_back(Lockout::new(0)); let vote = Vote::new(vec![0], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); let mut slot_history = SlotHistory::default(); slot_history.add(0); @@ -3153,7 +3281,7 @@ pub mod test { tower.vote_state.votes.push_back(Lockout::new(13)); tower.vote_state.votes.push_back(Lockout::new(14)); let vote = Vote::new(vec![14], Hash::default()); - tower.last_vote = vote; + tower.last_vote = VoteTransaction::from(vote); tower.initialize_root(12); let mut slot_history = SlotHistory::default(); diff --git a/core/src/lib.rs b/core/src/lib.rs index ec1be4e61e..5a0a593146 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -54,6 +54,7 @@ pub mod sigverify_stage; pub mod snapshot_packager_service; pub mod stats_reporter_service; pub mod system_monitor_service; +mod tower1_7_14; pub mod tower_storage; pub mod tpu; pub mod tree_diff; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b88c3f0a0f..0608770a73 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -21,7 +21,7 @@ use { progress_map::{ForkProgress, ProgressMap, PropagatedStats}, repair_service::DuplicateSlotsResetReceiver, rewards_recorder_service::RewardsRecorderSender, - tower_storage::{SavedTower, TowerStorage}, + tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, voting_service::VoteOp, window_service::DuplicateSlotReceiver, @@ -64,7 +64,7 @@ use { timing::timestamp, transaction::Transaction, }, - solana_vote_program::vote_state::Vote, + solana_vote_program::vote_state::VoteTransaction, std::{ collections::{HashMap, HashSet}, result, @@ -425,6 +425,7 @@ impl ReplayStage { last_refresh_time: Instant::now(), last_print_time: Instant::now(), }; + loop { // Stop getting entries if we get exit signal if exit.load(Ordering::Relaxed) { @@ -561,7 +562,7 @@ impl ReplayStage { &vote_account, &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut progress, &vote_tracker, &cluster_slots, @@ -1823,7 +1824,7 @@ impl ReplayStage { bank: &Bank, vote_account_pubkey: &Pubkey, authorized_voter_keypairs: &[Arc], - vote: Vote, + vote: VoteTransaction, switch_fork_decision: &SwitchForkDecision, vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, @@ -2026,7 +2027,7 @@ impl ReplayStage { .send(VoteOp::PushVote { tx: vote_tx, tower_slots, - saved_tower, + saved_tower: SavedTowerVersions::from(saved_tower), }) .unwrap_or_else(|err| warn!("Error: {:?}", err)); } @@ -2299,7 +2300,7 @@ impl ReplayStage { my_vote_pubkey: &Pubkey, ancestors: &HashMap>, frozen_banks: &mut Vec>, - tower: &Tower, + tower: &mut Tower, progress: &mut ProgressMap, vote_tracker: &VoteTracker, cluster_slots: &ClusterSlots, @@ -2320,6 +2321,70 @@ impl ReplayStage { .expect("All frozen banks must exist in the Progress map") .computed; if !is_computed { + // Check if our tower is behind, if so (and the feature migration flag is in use) + // overwrite with the newer bank. + if let (true, Some((_, vote_account))) = ( + Tower::is_direct_vote_state_update_enabled(bank), + bank.get_vote_account(my_vote_pubkey), + ) { + if let Some(mut bank_vote_state) = + vote_account.vote_state().as_ref().ok().cloned() + { + if bank_vote_state.last_voted_slot() + > tower.vote_state.last_voted_slot() + { + info!( + "Frozen bank vote state slot {:?} + is newer than our local vote state slot {:?}, + adopting the bank vote state as our own. + Bank votes: {:?}, root: {:?}, + Local votes: {:?}, root: {:?}", + bank_vote_state.last_voted_slot(), + tower.vote_state.last_voted_slot(), + bank_vote_state.votes, + bank_vote_state.root_slot, + tower.vote_state.votes, + tower.vote_state.root_slot + ); + + if let Some(local_root) = tower.vote_state.root_slot { + if bank_vote_state + .root_slot + .map(|bank_root| local_root > bank_root) + .unwrap_or(true) + { + // If the local root is larger than this on chain vote state + // root (possible due to supermajority roots being set on + // startup), then we need to adjust the tower + bank_vote_state.root_slot = Some(local_root); + bank_vote_state + .votes + .retain(|lockout| lockout.slot > local_root); + info!( + "Local root is larger than on chain root, + overwrote bank root {:?} and updated votes {:?}", + bank_vote_state.root_slot, bank_vote_state.votes + ); + + if let Some(first_vote) = bank_vote_state.votes.front() { + assert!(ancestors + .get(&first_vote.slot) + .expect( + "Ancestors map must contain an + entry for all slots on this fork + greater than `local_root` and less + than `bank_slot`" + ) + .contains(&local_root)); + } + } + } + + tower.vote_state.root_slot = bank_vote_state.root_slot; + tower.vote_state.votes = bank_vote_state.votes; + } + } + } let computed_bank_state = Tower::collect_vote_lockouts( my_vote_pubkey, bank_slot, @@ -3990,12 +4055,12 @@ pub mod tests { .values() .cloned() .collect(); - let tower = Tower::new_for_tests(0, 0.67); + let mut tower = Tower::new_for_tests(0, 0.67); let newly_computed = ReplayStage::compute_bank_stats( &my_vote_pubkey, &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut progress, &VoteTracker::default(), &ClusterSlots::default(), @@ -4039,7 +4104,7 @@ pub mod tests { &my_vote_pubkey, &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut progress, &VoteTracker::default(), &ClusterSlots::default(), @@ -4075,7 +4140,7 @@ pub mod tests { &my_vote_pubkey, &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut progress, &VoteTracker::default(), &ClusterSlots::default(), @@ -4091,7 +4156,7 @@ pub mod tests { fn test_same_weight_select_lower_slot() { // Init state let mut vote_simulator = VoteSimulator::new(1); - let tower = Tower::default(); + let mut tower = Tower::default(); // Create the tree of banks in a BankForks object let forks = tr(0) / (tr(1)) / (tr(2)); @@ -4114,7 +4179,7 @@ pub mod tests { &my_vote_pubkey, &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut vote_simulator.progress, &VoteTracker::default(), &ClusterSlots::default(), @@ -4170,7 +4235,7 @@ pub mod tests { // Set the voting behavior let mut cluster_votes = HashMap::new(); - let votes = vec![0, 2]; + let votes = vec![2]; cluster_votes.insert(my_node_pubkey, votes.clone()); vote_simulator.fill_bank_forks(forks, &cluster_votes, true); @@ -4195,7 +4260,7 @@ pub mod tests { &my_vote_pubkey, &vote_simulator.bank_forks.read().unwrap().ancestors(), &mut frozen_banks, - &tower, + &mut tower, &mut vote_simulator.progress, &VoteTracker::default(), &ClusterSlots::default(), @@ -5110,12 +5175,12 @@ pub mod tests { ); // Update propagation status - let tower = Tower::new_for_tests(0, 0.67); + let mut tower = Tower::new_for_tests(0, 0.67); ReplayStage::compute_bank_stats( &validator_node_to_vote_keys[&my_pubkey], &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut progress, &vote_tracker, &ClusterSlots::default(), @@ -5498,7 +5563,7 @@ pub mod tests { &Pubkey::new_unique(), &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut progress, &VoteTracker::default(), &ClusterSlots::default(), @@ -5625,7 +5690,7 @@ pub mod tests { &Pubkey::new_unique(), &ancestors, &mut frozen_banks, - &tower, + &mut tower, &mut progress, &VoteTracker::default(), &ClusterSlots::default(), diff --git a/core/src/tower1_7_14.rs b/core/src/tower1_7_14.rs new file mode 100644 index 0000000000..f092932816 --- /dev/null +++ b/core/src/tower1_7_14.rs @@ -0,0 +1,64 @@ +use crate::consensus::{SwitchForkDecision, TowerError}; +use solana_sdk::{ + clock::Slot, + hash::Hash, + pubkey::Pubkey, + signature::{Signature, Signer}, +}; +use solana_vote_program::vote_state::{BlockTimestamp, Vote, VoteState}; + +#[frozen_abi(digest = "7phMrqmBo2D3rXPdhBj8CpjRvvmx9qgpcU4cDGkL3W9q")] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] +pub struct Tower1_7_14 { + pub(crate) node_pubkey: Pubkey, + pub(crate) threshold_depth: usize, + pub(crate) threshold_size: f64, + pub(crate) vote_state: VoteState, + pub(crate) last_vote: Vote, + #[serde(skip)] + // The blockhash used in the last vote transaction, may or may not equal the + // blockhash of the voted block itself, depending if the vote slot was refreshed. + // For instance, a vote for slot 5, may be refreshed/resubmitted for inclusion in + // block 10, in which case `last_vote_tx_blockhash` equals the blockhash of 10, not 5. + pub(crate) last_vote_tx_blockhash: Hash, + pub(crate) last_timestamp: BlockTimestamp, + #[serde(skip)] + // Restored last voted slot which cannot be found in SlotHistory at replayed root + // (This is a special field for slashing-free validator restart with edge cases). + // This could be emptied after some time; but left intact indefinitely for easier + // implementation + // Further, stray slot can be stale or not. `Stale` here means whether given + // bank_forks (=~ ledger) lacks the slot or not. + pub(crate) stray_restored_slot: Option, + #[serde(skip)] + pub(crate) last_switch_threshold_check: Option<(Slot, SwitchForkDecision)>, +} + +#[frozen_abi(digest = "CxwFFxKfn6ez6wifDKr5WYr3eu2PsWUKdMYp3LX8Xj52")] +#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] +pub struct SavedTower1_7_14 { + pub(crate) signature: Signature, + pub(crate) data: Vec, + #[serde(skip)] + pub(crate) node_pubkey: Pubkey, +} + +impl SavedTower1_7_14 { + pub fn new(tower: &Tower1_7_14, keypair: &T) -> Result { + let node_pubkey = keypair.pubkey(); + if tower.node_pubkey != node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + + let data = bincode::serialize(tower)?; + let signature = keypair.sign_message(&data); + Ok(Self { + signature, + data, + node_pubkey, + }) + } +} diff --git a/core/src/tower_storage.rs b/core/src/tower_storage.rs index d3779ea1a8..ff37e745ee 100644 --- a/core/src/tower_storage.rs +++ b/core/src/tower_storage.rs @@ -1,5 +1,6 @@ use { - crate::consensus::{Result, Tower, TowerError}, + crate::consensus::{Result, Tower, TowerError, TowerVersions}, + crate::tower1_7_14::SavedTower1_7_14, solana_sdk::{ pubkey::Pubkey, signature::{Signature, Signer}, @@ -12,6 +13,67 @@ use { }, }; +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] +pub enum SavedTowerVersions { + V1_17_14(SavedTower1_7_14), + Current(SavedTower), +} + +impl SavedTowerVersions { + fn try_into_tower(&self, node_pubkey: &Pubkey) -> Result { + // This method assumes that `self` was just deserialized + assert_eq!(self.pubkey(), Pubkey::default()); + + let tv = match self { + SavedTowerVersions::V1_17_14(t) => { + if !t.signature.verify(node_pubkey.as_ref(), &t.data) { + return Err(TowerError::InvalidSignature); + } + bincode::deserialize(&t.data).map(TowerVersions::V1_17_14) + } + SavedTowerVersions::Current(t) => { + if !t.signature.verify(node_pubkey.as_ref(), &t.data) { + return Err(TowerError::InvalidSignature); + } + bincode::deserialize(&t.data).map(TowerVersions::Current) + } + }; + tv.map_err(|e| e.into()).and_then(|tv: TowerVersions| { + let tower = tv.convert_to_current(); + if tower.node_pubkey != *node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + Ok(tower) + }) + } + + fn serialize_into(&self, file: &mut File) -> Result<()> { + bincode::serialize_into(file, self).map_err(|e| e.into()) + } + + fn pubkey(&self) -> Pubkey { + match self { + SavedTowerVersions::V1_17_14(t) => t.node_pubkey, + SavedTowerVersions::Current(t) => t.node_pubkey, + } + } +} + +impl From for SavedTowerVersions { + fn from(tower: SavedTower) -> SavedTowerVersions { + SavedTowerVersions::Current(tower) + } +} + +impl From for SavedTowerVersions { + fn from(tower: SavedTower1_7_14) -> SavedTowerVersions { + SavedTowerVersions::V1_17_14(tower) + } +} + #[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")] #[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] pub struct SavedTower { @@ -39,45 +101,25 @@ impl SavedTower { node_pubkey, }) } - - pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result { - // This method assumes that `self` was just deserialized - assert_eq!(self.node_pubkey, Pubkey::default()); - - if !self.signature.verify(node_pubkey.as_ref(), &self.data) { - return Err(TowerError::InvalidSignature); - } - bincode::deserialize(&self.data) - .map_err(|e| e.into()) - .and_then(|tower: Tower| { - if tower.node_pubkey != *node_pubkey { - return Err(TowerError::WrongTower(format!( - "node_pubkey is {:?} but found tower for {:?}", - node_pubkey, tower.node_pubkey - ))); - } - Ok(tower) - }) - } } pub trait TowerStorage: Sync + Send { - fn load(&self, node_pubkey: &Pubkey) -> Result; - fn store(&self, saved_tower: &SavedTower) -> Result<()>; + fn load(&self, node_pubkey: &Pubkey) -> Result; + fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()>; } #[derive(Debug, Default, Clone, PartialEq)] pub struct NullTowerStorage {} impl TowerStorage for NullTowerStorage { - fn load(&self, _node_pubkey: &Pubkey) -> Result { + fn load(&self, _node_pubkey: &Pubkey) -> Result { Err(TowerError::IoError(io::Error::new( io::ErrorKind::Other, "NullTowerStorage::load() not available", ))) } - fn store(&self, _saved_tower: &SavedTower) -> Result<()> { + fn store(&self, _saved_tower: &SavedTowerVersions) -> Result<()> { Ok(()) } } @@ -92,35 +134,75 @@ impl FileTowerStorage { Self { tower_path } } - pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf { + // Old filename for towers pre 1.9 (VoteStateUpdate) + pub fn old_filename(&self, node_pubkey: &Pubkey) -> PathBuf { self.tower_path .join(format!("tower-{}", node_pubkey)) .with_extension("bin") } + + pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf { + self.tower_path + .join(format!("tower-1_9-{}", node_pubkey)) + .with_extension("bin") + } + + #[cfg(test)] + fn store_old(&self, saved_tower: &SavedTower1_7_14) -> Result<()> { + let pubkey = saved_tower.node_pubkey; + let filename = self.old_filename(&pubkey); + trace!("store: {}", filename.display()); + let new_filename = filename.with_extension("bin.new"); + + { + // overwrite anything if exists + let file = File::create(&new_filename)?; + bincode::serialize_into(file, saved_tower)?; + // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster! + } + fs::rename(&new_filename, &filename)?; + // self.path.parent().sync_all() hurts performance same as the above sync + Ok(()) + } } impl TowerStorage for FileTowerStorage { - fn load(&self, node_pubkey: &Pubkey) -> Result { + fn load(&self, node_pubkey: &Pubkey) -> Result { let filename = self.filename(node_pubkey); trace!("load {}", filename.display()); // Ensure to create parent dir here, because restore() precedes save() always fs::create_dir_all(&filename.parent().unwrap())?; - let file = File::open(&filename)?; - let mut stream = BufReader::new(file); - bincode::deserialize_from(&mut stream).map_err(|e| e.into()) + if let Ok(file) = File::open(&filename) { + // New format + let mut stream = BufReader::new(file); + + bincode::deserialize_from(&mut stream) + .map_err(|e| e.into()) + .and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey)) + } else { + // Old format + let file = File::open(&self.old_filename(node_pubkey))?; + let mut stream = BufReader::new(file); + bincode::deserialize_from(&mut stream) + .map_err(|e| e.into()) + .and_then(|t: SavedTower1_7_14| { + SavedTowerVersions::from(t).try_into_tower(node_pubkey) + }) + } } - fn store(&self, saved_tower: &SavedTower) -> Result<()> { - let filename = self.filename(&saved_tower.node_pubkey); + fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> { + let pubkey = saved_tower.pubkey(); + let filename = self.filename(&pubkey); trace!("store: {}", filename.display()); let new_filename = filename.with_extension("bin.new"); { // overwrite anything if exists let mut file = File::create(&new_filename)?; - bincode::serialize_into(&mut file, saved_tower)?; + saved_tower.serialize_into(&mut file)?; // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster! } fs::rename(&new_filename, &filename)?; @@ -194,7 +276,7 @@ impl EtcdTowerStorage { } impl TowerStorage for EtcdTowerStorage { - fn load(&self, node_pubkey: &Pubkey) -> Result { + fn load(&self, node_pubkey: &Pubkey) -> Result { let (instance_key, tower_key) = Self::get_keys(node_pubkey); let mut client = self.client.write().unwrap(); @@ -236,7 +318,9 @@ impl TowerStorage for EtcdTowerStorage { for op_response in response.op_responses() { if let etcd_client::TxnOpResponse::Get(get_response) = op_response { if let Some(kv) = get_response.kvs().get(0) { - return bincode::deserialize_from(kv.value()).map_err(|e| e.into()); + return bincode::deserialize_from(kv.value()) + .map_err(|e| e.into()) + .and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey)); } } } @@ -248,8 +332,8 @@ impl TowerStorage for EtcdTowerStorage { ))) } - fn store(&self, saved_tower: &SavedTower) -> Result<()> { - let (instance_key, tower_key) = Self::get_keys(&saved_tower.node_pubkey); + fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> { + let (instance_key, tower_key) = Self::get_keys(&saved_tower.pubkey()); let mut client = self.client.write().unwrap(); let txn = etcd_client::Txn::new() @@ -260,7 +344,7 @@ impl TowerStorage for EtcdTowerStorage { )]) .and_then(vec![etcd_client::TxnOp::put( tower_key, - bincode::serialize(saved_tower)?, + bincode::serialize(&saved_tower)?, None, )]); @@ -276,9 +360,63 @@ impl TowerStorage for EtcdTowerStorage { if !response.succeeded() { return Err(TowerError::IoError(io::Error::new( io::ErrorKind::Other, - format!("Lost etcd instance lock for {}", saved_tower.node_pubkey), + format!("Lost etcd instance lock for {}", saved_tower.pubkey()), ))); } Ok(()) } } + +#[cfg(test)] +pub mod test { + use { + super::*, + crate::{ + consensus::Tower, + tower1_7_14::{SavedTower1_7_14, Tower1_7_14}, + }, + solana_sdk::{hash::Hash, signature::Keypair}, + solana_vote_program::vote_state::{ + BlockTimestamp, Lockout, Vote, VoteState, VoteTransaction, MAX_LOCKOUT_HISTORY, + }, + tempfile::TempDir, + }; + + #[test] + fn test_tower_migration() { + let tower_path = TempDir::new().unwrap(); + let identity_keypair = Keypair::new(); + let node_pubkey = identity_keypair.pubkey(); + let mut vote_state = VoteState::default(); + vote_state + .votes + .resize(MAX_LOCKOUT_HISTORY, Lockout::default()); + vote_state.root_slot = Some(1); + + let vote = Vote::new(vec![1, 2, 3, 4], Hash::default()); + let tower_storage = FileTowerStorage::new(tower_path.path().to_path_buf()); + + let old_tower = Tower1_7_14 { + node_pubkey, + threshold_depth: 10, + threshold_size: 0.9, + vote_state, + last_vote: vote.clone(), + last_timestamp: BlockTimestamp::default(), + last_vote_tx_blockhash: Hash::default(), + stray_restored_slot: Some(2), + last_switch_threshold_check: Option::default(), + }; + + { + let saved_tower = SavedTower1_7_14::new(&old_tower, &identity_keypair).unwrap(); + tower_storage.store_old(&saved_tower).unwrap(); + } + + let loaded = Tower::restore(&tower_storage, &node_pubkey).unwrap(); + assert_eq!(loaded.node_pubkey, old_tower.node_pubkey); + assert_eq!(loaded.last_vote(), VoteTransaction::from(vote)); + assert_eq!(loaded.vote_state.root_slot, Some(1)); + assert_eq!(loaded.stray_restored_slot(), None); + } +} diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index b2b3974807..7ab6d9e9e3 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,5 +1,5 @@ use { - crate::tower_storage::{SavedTower, TowerStorage}, + crate::tower_storage::{SavedTowerVersions, TowerStorage}, crossbeam_channel::Receiver, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, @@ -16,7 +16,7 @@ pub enum VoteOp { PushVote { tx: Transaction, tower_slots: Vec, - saved_tower: SavedTower, + saved_tower: SavedTowerVersions, }, RefreshVote { tx: Transaction, diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index 34615f1c33..cfa26742c2 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -5,7 +5,7 @@ use { solana_core::{ broadcast_stage::BroadcastStageType, consensus::{Tower, SWITCH_FORK_THRESHOLD}, - tower_storage::FileTowerStorage, + tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, validator::ValidatorConfig, }, solana_gossip::gossip_service::discover_cluster, @@ -406,3 +406,15 @@ pub fn test_faulty_node( (cluster, validator_keys) } + +pub fn save_tower(tower_path: &Path, tower: &Tower, node_keypair: &Keypair) { + let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); + let saved_tower = SavedTower::new(tower, node_keypair).unwrap(); + file_tower_storage + .store(&SavedTowerVersions::from(saved_tower)) + .unwrap(); +} + +pub fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { + restore_tower(tower_path, node_pubkey).map(|tower| tower.root()) +} diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 6a7f390ad0..4ade36b42b 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -3,8 +3,8 @@ use { assert_matches::assert_matches, common::{ copy_blocks, create_custom_leader_schedule, last_vote_in_tower, ms_for_n_slots, - open_blockstore, purge_slots, remove_tower, restore_tower, run_cluster_partition, - run_kill_partition_switch_threshold, test_faulty_node, + open_blockstore, purge_slots, remove_tower, restore_tower, root_in_tower, + run_cluster_partition, run_kill_partition_switch_threshold, save_tower, test_faulty_node, wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, }, crossbeam_channel::{unbounded, Receiver}, @@ -23,7 +23,7 @@ use { consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, - tower_storage::{FileTowerStorage, SavedTower, TowerStorage}, + tower_storage::FileTowerStorage, validator::ValidatorConfig, }, solana_download_utils::download_snapshot_archive, @@ -1913,16 +1913,6 @@ fn test_validator_saves_tower() { assert!(tower4.root() >= new_root); } -fn save_tower(tower_path: &Path, tower: &Tower, node_keypair: &Keypair) { - let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); - let saved_tower = SavedTower::new(tower, node_keypair).unwrap(); - file_tower_storage.store(&saved_tower).unwrap(); -} - -fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { - restore_tower(tower_path, node_pubkey).map(|tower| tower.root()) -} - // This test verifies that even if votes from a validator end up taking too long to land, and thus // some of the referenced slots are slots are no longer present in the slot hashes sysvar, // consensus can still be attained. @@ -2318,6 +2308,14 @@ fn test_hard_fork_invalidates_tower() { validator_b_info.config.wait_for_supermajority = Some(hard_fork_slot); validator_b_info.config.expected_shred_version = Some(expected_shred_version); + // Clear ledger of all slots post hard fork + { + let blockstore_a = open_blockstore(&validator_a_info.info.ledger_path); + let blockstore_b = open_blockstore(&validator_b_info.info.ledger_path); + purge_slots(&blockstore_a, hard_fork_slot + 1, 100); + purge_slots(&blockstore_b, hard_fork_slot + 1, 100); + } + // restart validator A first let cluster_for_a = cluster.clone(); // Spawn a thread because wait_for_supermajority blocks in Validator::new()! @@ -2374,91 +2372,6 @@ fn test_run_test_load_program_accounts_root() { run_test_load_program_accounts(CommitmentConfig::finalized()); } -#[test] -#[serial] -fn test_restart_tower_rollback() { - // Test node crashing and failing to save its tower before restart - solana_logger::setup_with_default(RUST_LOG_FILTER); - - // First set up the cluster with 4 nodes - let slots_per_epoch = 2048; - let node_stakes = vec![10000, 1]; - - let validator_strings = vec![ - "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", - "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", - ]; - - let validator_b_keypair = Arc::new(Keypair::from_base58_string(validator_strings[1])); - let validator_b_pubkey = validator_b_keypair.pubkey(); - - let validator_keys = validator_strings - .iter() - .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) - .take(node_stakes.len()) - .collect::>(); - let mut config = ClusterConfig { - cluster_lamports: 100_000, - node_stakes: node_stakes.clone(), - validator_configs: make_identical_validator_configs( - &ValidatorConfig::default_for_test(), - node_stakes.len(), - ), - validator_keys: Some(validator_keys), - slots_per_epoch, - stakers_slot_offset: slots_per_epoch, - skip_warmup_slots: true, - ..ClusterConfig::default() - }; - let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - - let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey); - - let mut earlier_tower: Tower; - loop { - sleep(Duration::from_millis(1000)); - - // Grab the current saved tower - earlier_tower = restore_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap(); - if earlier_tower.last_voted_slot().unwrap_or(0) > 1 { - break; - } - } - - let exited_validator_info: ClusterValidatorInfo; - loop { - sleep(Duration::from_millis(1000)); - - // Wait for second, lesser staked validator to make a root past the earlier_tower's - // latest vote slot, then exit that validator - if let Some(root) = root_in_tower(&val_b_ledger_path, &validator_b_pubkey) { - if root - > earlier_tower - .last_voted_slot() - .expect("Earlier tower must have at least one vote") - { - exited_validator_info = cluster.exit_node(&validator_b_pubkey); - break; - } - } - } - - // Now rewrite the tower with the *earlier_tower* - save_tower(&val_b_ledger_path, &earlier_tower, &validator_b_keypair); - cluster.restart_node( - &validator_b_pubkey, - exited_validator_info, - SocketAddrSpace::Unspecified, - ); - - // Check this node is making new roots - cluster.check_for_new_roots( - 20, - "test_restart_tower_rollback", - SocketAddrSpace::Unspecified, - ); -} - #[test] #[serial] fn test_run_test_load_program_accounts_partition_root() { diff --git a/local-cluster/tests/local_cluster_flakey.rs b/local-cluster/tests/local_cluster_flakey.rs index 09a5b43f77..b4f413d867 100644 --- a/local-cluster/tests/local_cluster_flakey.rs +++ b/local-cluster/tests/local_cluster_flakey.rs @@ -3,19 +3,19 @@ #![allow(clippy::integer_arithmetic)] use { common::{ - copy_blocks, last_vote_in_tower, open_blockstore, purge_slots, remove_tower, - wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, + copy_blocks, last_vote_in_tower, open_blockstore, purge_slots, remove_tower, restore_tower, + root_in_tower, save_tower, wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, }, log::*, serial_test::serial, - solana_core::validator::ValidatorConfig, + solana_core::{consensus::Tower, validator::ValidatorConfig}, solana_ledger::{ ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db::{AccessType, BlockstoreOptions}, }, solana_local_cluster::{ - cluster::Cluster, + cluster::{Cluster, ClusterValidatorInfo}, local_cluster::{ClusterConfig, LocalCluster}, validator_configs::*, }, @@ -358,3 +358,89 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b info!("THIS TEST expected no violation. And indeed, there was none, thanks to persisted tower."); } } + +#[test] +#[serial] +#[ignore] +fn test_restart_tower_rollback() { + // Test node crashing and failing to save its tower before restart + solana_logger::setup_with_default(RUST_LOG_FILTER); + + // First set up the cluster with 4 nodes + let slots_per_epoch = 2048; + let node_stakes = vec![10000, 1]; + + let validator_strings = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + ]; + + let validator_b_keypair = Arc::new(Keypair::from_base58_string(validator_strings[1])); + let validator_b_pubkey = validator_b_keypair.pubkey(); + + let validator_keys = validator_strings + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .take(node_stakes.len()) + .collect::>(); + let mut config = ClusterConfig { + cluster_lamports: 100_000, + node_stakes: node_stakes.clone(), + validator_configs: make_identical_validator_configs( + &ValidatorConfig::default_for_test(), + node_stakes.len(), + ), + validator_keys: Some(validator_keys), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + + let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey); + + let mut earlier_tower: Tower; + loop { + sleep(Duration::from_millis(1000)); + + // Grab the current saved tower + earlier_tower = restore_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap(); + if earlier_tower.last_voted_slot().unwrap_or(0) > 1 { + break; + } + } + + let exited_validator_info: ClusterValidatorInfo; + loop { + sleep(Duration::from_millis(1000)); + + // Wait for second, lesser staked validator to make a root past the earlier_tower's + // latest vote slot, then exit that validator + if let Some(root) = root_in_tower(&val_b_ledger_path, &validator_b_pubkey) { + if root + > earlier_tower + .last_voted_slot() + .expect("Earlier tower must have at least one vote") + { + exited_validator_info = cluster.exit_node(&validator_b_pubkey); + break; + } + } + } + + // Now rewrite the tower with the *earlier_tower* + save_tower(&val_b_ledger_path, &earlier_tower, &validator_b_keypair); + cluster.restart_node( + &validator_b_pubkey, + exited_validator_info, + SocketAddrSpace::Unspecified, + ); + + // Check this node is making new roots + cluster.check_for_new_roots( + 20, + "test_restart_tower_rollback", + SocketAddrSpace::Unspecified, + ); +} diff --git a/programs/vote/src/vote_error.rs b/programs/vote/src/vote_error.rs index 55bd9b117d..be4855d120 100644 --- a/programs/vote/src/vote_error.rs +++ b/programs/vote/src/vote_error.rs @@ -61,6 +61,9 @@ pub enum VoteError { #[error("every slot in the vote was older than the SlotHashes history")] VotesTooOldAllFiltered, + + #[error("Proposed root is not in slot hashes")] + RootOnDifferentFork, } impl DecodeError for VoteError { diff --git a/programs/vote/src/vote_state/mod.rs b/programs/vote/src/vote_state/mod.rs index ceaebe3269..f986616e07 100644 --- a/programs/vote/src/vote_state/mod.rs +++ b/programs/vote/src/vote_state/mod.rs @@ -20,7 +20,6 @@ use { sysvar::clock::Clock, }, std::{ - boxed::Box, cmp::Ordering, collections::{HashSet, VecDeque}, fmt::Debug, @@ -41,6 +40,93 @@ pub const MAX_EPOCH_CREDITS_HISTORY: usize = 64; // Offset of VoteState::prior_voters, for determining initialization status without deserialization const DEFAULT_PRIOR_VOTERS_OFFSET: usize = 82; +#[frozen_abi(digest = "6LBwH5w3WyAWZhsM3KTG9QZP7nYBhcC61K33kHR6gMAD")] +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, AbiEnumVisitor, AbiExample)] +pub enum VoteTransaction { + Vote(Vote), + VoteStateUpdate(VoteStateUpdate), +} + +impl VoteTransaction { + pub fn slots(&self) -> Vec { + match self { + VoteTransaction::Vote(vote) => vote.slots.clone(), + VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.slots(), + } + } + + pub fn slot(&self, i: usize) -> Slot { + match self { + VoteTransaction::Vote(vote) => vote.slots[i], + VoteTransaction::VoteStateUpdate(vote_state_update) => { + vote_state_update.lockouts[i].slot + } + } + } + + pub fn len(&self) -> usize { + match self { + VoteTransaction::Vote(vote) => vote.slots.len(), + VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.lockouts.len(), + } + } + + pub fn is_empty(&self) -> bool { + match self { + VoteTransaction::Vote(vote) => vote.slots.is_empty(), + VoteTransaction::VoteStateUpdate(vote_state_update) => { + vote_state_update.lockouts.is_empty() + } + } + } + + pub fn hash(&self) -> Hash { + match self { + VoteTransaction::Vote(vote) => vote.hash, + VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.hash, + } + } + + pub fn timestamp(&self) -> Option { + match self { + VoteTransaction::Vote(vote) => vote.timestamp, + VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.timestamp, + } + } + + pub fn set_timestamp(&mut self, ts: Option) { + match self { + VoteTransaction::Vote(vote) => vote.timestamp = ts, + VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.timestamp = ts, + } + } + + pub fn last_voted_slot(&self) -> Option { + match self { + VoteTransaction::Vote(vote) => vote.slots.last().copied(), + VoteTransaction::VoteStateUpdate(vote_state_update) => { + Some(vote_state_update.lockouts.back()?.slot) + } + } + } + + pub fn last_voted_slot_hash(&self) -> Option<(Slot, Hash)> { + Some((self.last_voted_slot()?, self.hash())) + } +} + +impl From for VoteTransaction { + fn from(vote: Vote) -> Self { + VoteTransaction::Vote(vote) + } +} + +impl From for VoteTransaction { + fn from(vote_state_update: VoteStateUpdate) -> Self { + VoteTransaction::VoteStateUpdate(vote_state_update) + } +} + #[frozen_abi(digest = "Ch2vVEwos2EjAVqSHCyJjnN2MNX1yrpapZTGhMSCjWUH")] #[derive(Serialize, Default, Deserialize, Debug, PartialEq, Eq, Clone, AbiExample)] pub struct Vote { @@ -93,6 +179,7 @@ impl Lockout { } } +#[frozen_abi(digest = "BctadFJjUKbvPJzr6TszbX6rBfQUNSRKpKKngkzgXgeY")] #[derive(Serialize, Default, Deserialize, Debug, PartialEq, Eq, Clone, AbiExample)] pub struct VoteStateUpdate { /// The proposed tower @@ -132,6 +219,10 @@ impl VoteStateUpdate { timestamp: None, } } + + pub fn slots(&self) -> Vec { + self.lockouts.iter().map(|lockout| lockout.slot).collect() + } } #[derive(Default, Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)] @@ -382,6 +473,12 @@ impl VoteState { // to the current vote state root for safety. if earliest_slot_hash_in_history > new_proposed_root { vote_state_update.root = self.root_slot; + } else if !slot_hashes + // Verify that the root is in slot hashes + .iter() + .any(|&(slot, _)| slot == new_proposed_root) + { + return Err(VoteError::RootOnDifferentFork); } } @@ -779,7 +876,6 @@ impl VoteState { if vote.slots.is_empty() { return Err(VoteError::EmptySlots); } - let filtered_vote_slots = feature_set.and_then(|feature_set| { if feature_set.is_active(&filter_votes_outside_slot_hashes::id()) { let earliest_slot_in_history = @@ -1537,8 +1633,10 @@ mod tests { let versioned = VoteStateVersions::new_current(vote_state); assert!(VoteState::serialize(&versioned, &mut buffer[0..4]).is_err()); VoteState::serialize(&versioned, &mut buffer).unwrap(); - let des = VoteState::deserialize(&buffer).unwrap(); - assert_eq!(des, versioned.convert_to_current(),); + assert_eq!( + VoteState::deserialize(&buffer).unwrap(), + versioned.convert_to_current() + ); } #[test]