Add feature gate for new vote instruction and plumb through replay (#21683)

* Add feature gate for new vote instruction and plumb through replay

Add tower versions

* Add check for slot hashes history

* Update is_recent check to exclude voting on hard fork root slot

* Move tower rollback test to flaky and ignore it until #22551 lands
This commit is contained in:
Ashwin Sekar 2022-02-07 14:06:19 -08:00 committed by GitHub
parent d7fcfee4db
commit 5acf0f6331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 840 additions and 332 deletions

View File

@ -1,9 +1,10 @@
use crate::tower1_7_14::Tower1_7_14;
use {
crate::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
progress_map::{LockoutIntervals, ProgressMap},
tower_storage::{SavedTower, TowerStorage},
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
},
chrono::prelude::*,
solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db},
@ -13,6 +14,7 @@ use {
},
solana_sdk::{
clock::{Slot, UnixTimestamp},
feature_set,
hash::Hash,
instruction::Instruction,
pubkey::Pubkey,
@ -21,7 +23,10 @@ use {
},
solana_vote_program::{
vote_instruction,
vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY},
vote_state::{
BlockTimestamp, Lockout, Vote, VoteState, VoteStateUpdate, VoteTransaction,
MAX_LOCKOUT_HISTORY,
},
},
std::{
cmp::Ordering,
@ -45,29 +50,43 @@ pub enum SwitchForkDecision {
impl SwitchForkDecision {
pub fn to_vote_instruction(
&self,
vote: Vote,
vote: VoteTransaction,
vote_account_pubkey: &Pubkey,
authorized_voter_pubkey: &Pubkey,
) -> Option<Instruction> {
match self {
SwitchForkDecision::FailedSwitchThreshold(_, total_stake) => {
match (self, vote) {
(SwitchForkDecision::FailedSwitchThreshold(_, total_stake), _) => {
assert_ne!(*total_stake, 0);
None
}
SwitchForkDecision::FailedSwitchDuplicateRollback(_) => None,
SwitchForkDecision::SameFork => Some(vote_instruction::vote(
vote_account_pubkey,
authorized_voter_pubkey,
vote,
)),
SwitchForkDecision::SwitchProof(switch_proof_hash) => {
(SwitchForkDecision::FailedSwitchDuplicateRollback(_), _) => None,
(SwitchForkDecision::SameFork, VoteTransaction::Vote(v)) => Some(
vote_instruction::vote(vote_account_pubkey, authorized_voter_pubkey, v),
),
(SwitchForkDecision::SameFork, VoteTransaction::VoteStateUpdate(v)) => {
Some(vote_instruction::update_vote_state(
vote_account_pubkey,
authorized_voter_pubkey,
v,
))
}
(SwitchForkDecision::SwitchProof(switch_proof_hash), VoteTransaction::Vote(v)) => {
Some(vote_instruction::vote_switch(
vote_account_pubkey,
authorized_voter_pubkey,
vote,
v,
*switch_proof_hash,
))
}
(
SwitchForkDecision::SwitchProof(switch_proof_hash),
VoteTransaction::VoteStateUpdate(v),
) => Some(vote_instruction::update_vote_state_switch(
vote_account_pubkey,
authorized_voter_pubkey,
v,
*switch_proof_hash,
)),
}
}
@ -102,14 +121,47 @@ pub(crate) struct ComputedBankState {
pub my_latest_landed_vote: Option<Slot>,
}
#[frozen_abi(digest = "GMs1FxKteU7K4ZFRofMBqNhBpM4xkPVxfYod6R8DQmpT")]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub enum TowerVersions {
V1_17_14(Tower1_7_14),
Current(Tower),
}
impl TowerVersions {
pub fn new_current(tower: Tower) -> Self {
Self::Current(tower)
}
pub fn convert_to_current(self) -> Tower {
match self {
TowerVersions::V1_17_14(tower) => {
let box_last_vote = VoteTransaction::from(tower.last_vote.clone());
Tower {
node_pubkey: tower.node_pubkey,
threshold_depth: tower.threshold_depth,
threshold_size: tower.threshold_size,
vote_state: tower.vote_state,
last_vote: box_last_vote,
last_vote_tx_blockhash: tower.last_vote_tx_blockhash,
last_timestamp: tower.last_timestamp,
stray_restored_slot: tower.stray_restored_slot,
last_switch_threshold_check: tower.last_switch_threshold_check,
}
}
TowerVersions::Current(tower) => tower,
}
}
}
#[frozen_abi(digest = "BfeSJNsfQeX6JU7dmezv1s1aSvR5SoyxKRRZ4ubTh2mt")]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct Tower {
pub node_pubkey: Pubkey,
threshold_depth: usize,
threshold_size: f64,
vote_state: VoteState,
last_vote: Vote,
pub(crate) vote_state: VoteState,
last_vote: VoteTransaction,
#[serde(skip)]
// The blockhash used in the last vote transaction, may or may not equal the
// blockhash of the voted block itself, depending if the vote slot was refreshed.
@ -136,7 +188,7 @@ impl Default for Tower {
threshold_depth: VOTE_THRESHOLD_DEPTH,
threshold_size: VOTE_THRESHOLD_SIZE,
vote_state: VoteState::default(),
last_vote: Vote::default(),
last_vote: VoteTransaction::from(VoteStateUpdate::default()),
last_timestamp: BlockTimestamp::default(),
last_vote_tx_blockhash: Hash::default(),
stray_restored_slot: Option::default(),
@ -359,12 +411,18 @@ impl Tower {
self.last_vote_tx_blockhash = new_vote_tx_blockhash;
}
// Returns true if we have switched the new vote instruction that directly sets vote state
pub(crate) fn is_direct_vote_state_update_enabled(bank: &Bank) -> bool {
bank.feature_set
.is_active(&feature_set::allow_votes_to_directly_update_vote_state::id())
}
fn apply_vote_and_generate_vote_diff(
local_vote_state: &mut VoteState,
slot: Slot,
hash: Hash,
last_voted_slot_in_bank: Option<Slot>,
) -> Vote {
) -> VoteTransaction {
let vote = Vote::new(vec![slot], hash);
local_vote_state.process_vote_unchecked(vote);
let slots = if let Some(last_voted_slot) = last_voted_slot_in_bank {
@ -377,7 +435,7 @@ impl Tower {
} else {
local_vote_state.votes.iter().map(|v| v.slot).collect()
};
Vote::new(slots, hash)
VoteTransaction::from(Vote::new(slots, hash))
}
pub fn last_voted_slot_in_bank(bank: &Bank, vote_account_pubkey: &Pubkey) -> Option<Slot> {
@ -391,7 +449,12 @@ impl Tower {
// Returns the new root if one is made after applying a vote for the given bank to
// `self.vote_state`
self.record_bank_vote_and_update_lockouts(bank.slot(), bank.hash(), last_voted_slot_in_bank)
self.record_bank_vote_and_update_lockouts(
bank.slot(),
bank.hash(),
last_voted_slot_in_bank,
Self::is_direct_vote_state_update_enabled(bank),
)
}
fn record_bank_vote_and_update_lockouts(
@ -399,18 +462,29 @@ impl Tower {
vote_slot: Slot,
vote_hash: Hash,
last_voted_slot_in_bank: Option<Slot>,
is_direct_vote_state_update_enabled: bool,
) -> Option<Slot> {
trace!("{} record_vote for {}", self.node_pubkey, vote_slot);
let old_root = self.root();
let mut new_vote = Self::apply_vote_and_generate_vote_diff(
&mut self.vote_state,
vote_slot,
vote_hash,
last_voted_slot_in_bank,
);
new_vote.timestamp =
self.maybe_timestamp(self.last_vote.slots.last().copied().unwrap_or_default());
let mut new_vote = if is_direct_vote_state_update_enabled {
let vote = Vote::new(vec![vote_slot], vote_hash);
self.vote_state.process_vote_unchecked(vote);
VoteTransaction::from(VoteStateUpdate::new(
self.vote_state.votes.clone(),
self.vote_state.root_slot,
vote_hash,
))
} else {
Self::apply_vote_and_generate_vote_diff(
&mut self.vote_state,
vote_slot,
vote_hash,
last_voted_slot_in_bank,
)
};
new_vote.set_timestamp(self.maybe_timestamp(self.last_voted_slot().unwrap_or_default()));
self.last_vote = new_vote;
let new_root = self.root();
@ -429,7 +503,7 @@ impl Tower {
#[cfg(test)]
pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option<Slot> {
self.record_bank_vote_and_update_lockouts(slot, hash, self.last_voted_slot())
self.record_bank_vote_and_update_lockouts(slot, hash, self.last_voted_slot(), true)
}
/// Used for tests
@ -440,18 +514,22 @@ impl Tower {
}
pub fn last_voted_slot(&self) -> Option<Slot> {
self.last_vote.slots.last().copied()
if self.last_vote.is_empty() {
None
} else {
Some(self.last_vote.slot(self.last_vote.len() - 1))
}
}
pub fn last_voted_slot_hash(&self) -> Option<(Slot, Hash)> {
Some((*self.last_vote.slots.last()?, self.last_vote.hash))
Some((self.last_voted_slot()?, self.last_vote.hash()))
}
pub fn stray_restored_slot(&self) -> Option<Slot> {
self.stray_restored_slot
}
pub fn last_vote(&self) -> Vote {
pub fn last_vote(&self) -> VoteTransaction {
self.last_vote.clone()
}
@ -482,12 +560,17 @@ impl Tower {
self.vote_state.root_slot.unwrap()
}
// a slot is recent if it's newer than the last vote we have
// a slot is recent if it's newer than the last vote we have. If we haven't voted yet
// but have a root (hard forks situation) then comparre it to the root
pub fn is_recent(&self, slot: Slot) -> bool {
if let Some(last_voted_slot) = self.vote_state.last_voted_slot() {
if slot <= last_voted_slot {
return false;
}
} else if let Some(root) = self.vote_state.root_slot {
if slot <= root {
return false;
}
}
true
}
@ -583,9 +666,9 @@ impl Tower {
// `switch < last` is needed not to warn! this message just because of using
// newer snapshots on validator restart
let message = format!(
"bank_forks doesn't have corresponding data for the stray restored \
"bank_forks doesn't have corresponding data for the stray restored \
last vote({}), meaning some inconsistency between saved tower and ledger.",
last_voted_slot
last_voted_slot
);
warn!("{}", message);
datapoint_warn!("tower_warn", ("warn", message, String));
@ -627,42 +710,56 @@ impl Tower {
// TODO: Handle if the last vote is on a dupe, and then we restart. The dupe won't be in
// heaviest_subtree_fork_choice, so `heaviest_subtree_fork_choice.latest_invalid_ancestor()` will return
// None, but the last vote will be persisted in tower.
let switch_hash = progress.get_hash(switch_slot).expect("Slot we're trying to switch to must exist AND be frozen in progress map");
if let Some(latest_duplicate_ancestor) = heaviest_subtree_fork_choice.latest_invalid_ancestor(&(last_voted_slot, last_voted_hash)) {
let switch_hash = progress
.get_hash(switch_slot)
.expect("Slot we're trying to switch to must exist AND be frozen in progress map");
if let Some(latest_duplicate_ancestor) = heaviest_subtree_fork_choice
.latest_invalid_ancestor(&(last_voted_slot, last_voted_hash))
{
// We're rolling back because one of the ancestors of the last vote was a duplicate. In this
// case, it's acceptable if the switch candidate is one of ancestors of the previous vote,
// just fail the switch check because there's no point in voting on an ancestor. ReplayStage
// should then have a special case continue building an alternate fork from this ancestor, NOT
// the `last_voted_slot`. This is in contrast to usual SwitchFailure where ReplayStage continues to build blocks
// on latest vote. See `ReplayStage::select_vote_and_reset_forks()` for more details.
if heaviest_subtree_fork_choice.is_strict_ancestor(&(switch_slot, switch_hash), &(last_voted_slot, last_voted_hash)) {
if heaviest_subtree_fork_choice.is_strict_ancestor(
&(switch_slot, switch_hash),
&(last_voted_slot, last_voted_hash),
) {
return rollback_due_to_to_to_duplicate_ancestor(latest_duplicate_ancestor);
} else if progress.get_hash(last_voted_slot).map(|current_slot_hash| current_slot_hash != last_voted_hash).unwrap_or(true) {
} else if progress
.get_hash(last_voted_slot)
.map(|current_slot_hash| current_slot_hash != last_voted_hash)
.unwrap_or(true)
{
// Our last vote slot was purged because it was on a duplicate fork, don't continue below
// where checks may panic. We allow a freebie vote here that may violate switching
// thresholds
// TODO: Properly handle this case
info!("Allowing switch vote on {:?} because last vote {:?} was rolled back", (switch_slot, switch_hash), (last_voted_slot, last_voted_hash));
info!(
"Allowing switch vote on {:?} because last vote {:?} was rolled back",
(switch_slot, switch_hash),
(last_voted_slot, last_voted_hash)
);
return SwitchForkDecision::SwitchProof(Hash::default());
}
}
let last_vote_ancestors =
ancestors.get(&last_voted_slot).unwrap_or_else(|| {
if self.is_stray_last_vote() {
// Unless last vote is stray and stale, ancestors.get(last_voted_slot) must
// return Some(_), justifying to panic! here.
// Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None,
// if all saved votes are ancestors of replayed_root_slot. So this code shouldn't be
// touched in that case as well.
// In other words, except being stray, all other slots have been voted on while
// this validator has been running, so we must be able to fetch ancestors for
// all of them.
empty_ancestors_due_to_minor_unsynced_ledger()
} else {
panic!("no ancestors found with slot: {}", last_voted_slot);
}
});
let last_vote_ancestors = ancestors.get(&last_voted_slot).unwrap_or_else(|| {
if self.is_stray_last_vote() {
// Unless last vote is stray and stale, ancestors.get(last_voted_slot) must
// return Some(_), justifying to panic! here.
// Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None,
// if all saved votes are ancestors of replayed_root_slot. So this code shouldn't be
// touched in that case as well.
// In other words, except being stray, all other slots have been voted on while
// this validator has been running, so we must be able to fetch ancestors for
// all of them.
empty_ancestors_due_to_minor_unsynced_ledger()
} else {
panic!("no ancestors found with slot: {}", last_voted_slot);
}
});
let switch_slot_ancestors = ancestors.get(&switch_slot).unwrap();
@ -734,22 +831,76 @@ impl Tower {
// Find any locked out intervals for vote accounts in this bank with
// `lockout_interval_end` >= `last_vote`, which implies they are locked out at
// `last_vote` on another fork.
for (_lockout_interval_end, intervals_keyed_by_end) in lockout_intervals.range((Included(last_voted_slot), Unbounded)) {
for (lockout_interval_start, vote_account_pubkey) in intervals_keyed_by_end {
if locked_out_vote_accounts.contains(vote_account_pubkey) {
continue;
}
for (_lockout_interval_end, intervals_keyed_by_end) in
lockout_intervals.range((Included(last_voted_slot), Unbounded))
{
for (lockout_interval_start, vote_account_pubkey) in intervals_keyed_by_end {
if locked_out_vote_accounts.contains(vote_account_pubkey) {
continue;
}
// Only count lockouts on slots that are:
// 1) Not ancestors of `last_vote`, meaning being on different fork
// 2) Not from before the current root as we can't determine if
// anything before the root was an ancestor of `last_vote` or not
if !last_vote_ancestors.contains(lockout_interval_start)
// Given a `lockout_interval_start` < root that appears in a
// bank for a `candidate_slot`, it must be that `lockout_interval_start`
// is an ancestor of the current root, because `candidate_slot` is a
// descendant of the current root
&& *lockout_interval_start > root
// Only count lockouts on slots that are:
// 1) Not ancestors of `last_vote`, meaning being on different fork
// 2) Not from before the current root as we can't determine if
// anything before the root was an ancestor of `last_vote` or not
if !last_vote_ancestors.contains(lockout_interval_start)
// Given a `lockout_interval_start` < root that appears in a
// bank for a `candidate_slot`, it must be that `lockout_interval_start`
// is an ancestor of the current root, because `candidate_slot` is a
// descendant of the current root
&& *lockout_interval_start > root
{
let stake = epoch_vote_accounts
.get(vote_account_pubkey)
.map(|(stake, _)| *stake)
.unwrap_or(0);
locked_out_stake += stake;
if (locked_out_stake as f64 / total_stake as f64)
> SWITCH_FORK_THRESHOLD
{
return SwitchForkDecision::SwitchProof(switch_proof);
}
locked_out_vote_accounts.insert(vote_account_pubkey);
}
}
}
}
// Check the latest votes for potentially gossip votes that haven't landed yet
for (
vote_account_pubkey,
(candidate_latest_frozen_vote, _candidate_latest_frozen_vote_hash),
) in latest_validator_votes_for_frozen_banks.max_gossip_frozen_votes()
{
if locked_out_vote_accounts.contains(&vote_account_pubkey) {
continue;
}
if *candidate_latest_frozen_vote > last_voted_slot
&&
// Because `candidate_latest_frozen_vote` is the last vote made by some validator
// in the cluster for a frozen bank `B` observed through gossip, we may have cleared
// that frozen bank `B` because we `set_root(root)` for a `root` on a different fork,
// like so:
//
// |----------X ------candidate_latest_frozen_vote (frozen)
// old root
// |----------new root ----last_voted_slot
//
// In most cases, because `last_voted_slot` must be a descendant of `root`, then
// if `candidate_latest_frozen_vote` is not found in the ancestors/descendants map (recall these
// directly reflect the state of BankForks), this implies that `B` was pruned from BankForks
// because it was on a different fork than `last_voted_slot`, and thus this vote for `candidate_latest_frozen_vote`
// should be safe to count towards the switching proof:
//
// However, there is also the possibility that `last_voted_slot` is a stray, in which
// case we cannot make this conclusion as we do not know the ancestors/descendants
// of strays. Hence we err on the side of caution here and ignore this vote. This
// is ok because validators voting on different unrooted forks should eventually vote
// on some descendant of the root, at which time they can be included in switching proofs.
!Self::is_candidate_slot_descendant_of_last_vote(
*candidate_latest_frozen_vote, last_voted_slot, ancestors)
.unwrap_or(true)
{
let stake = epoch_vote_accounts
.get(vote_account_pubkey)
@ -761,58 +912,13 @@ impl Tower {
}
locked_out_vote_accounts.insert(vote_account_pubkey);
}
}
}
}
// Check the latest votes for potentially gossip votes that haven't landed yet
for (vote_account_pubkey, (candidate_latest_frozen_vote, _candidate_latest_frozen_vote_hash)) in latest_validator_votes_for_frozen_banks.max_gossip_frozen_votes() {
if locked_out_vote_accounts.contains(&vote_account_pubkey) {
continue;
}
if *candidate_latest_frozen_vote > last_voted_slot
&&
// Because `candidate_latest_frozen_vote` is the last vote made by some validator
// in the cluster for a frozen bank `B` observed through gossip, we may have cleared
// that frozen bank `B` because we `set_root(root)` for a `root` on a different fork,
// like so:
//
// |----------X ------candidate_latest_frozen_vote (frozen)
// old root
// |----------new root ----last_voted_slot
//
// In most cases, because `last_voted_slot` must be a descendant of `root`, then
// if `candidate_latest_frozen_vote` is not found in the ancestors/descendants map (recall these
// directly reflect the state of BankForks), this implies that `B` was pruned from BankForks
// because it was on a different fork than `last_voted_slot`, and thus this vote for `candidate_latest_frozen_vote`
// should be safe to count towards the switching proof:
//
// However, there is also the possibility that `last_voted_slot` is a stray, in which
// case we cannot make this conclusion as we do not know the ancestors/descendants
// of strays. Hence we err on the side of caution here and ignore this vote. This
// is ok because validators voting on different unrooted forks should eventually vote
// on some descendant of the root, at which time they can be included in switching proofs.
!Self::is_candidate_slot_descendant_of_last_vote(
*candidate_latest_frozen_vote, last_voted_slot, ancestors)
.unwrap_or(true) {
let stake = epoch_vote_accounts
.get(vote_account_pubkey)
.map(|(stake, _)| *stake)
.unwrap_or(0);
locked_out_stake += stake;
if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD {
return SwitchForkDecision::SwitchProof(switch_proof);
}
locked_out_vote_accounts.insert(vote_account_pubkey);
}
}
// We have not detected sufficient lockout past the last voted slot to generate
// a switching proof
SwitchForkDecision::FailedSwitchThreshold(locked_out_stake, total_stake)
})
.unwrap_or(SwitchForkDecision::SameFork)
.unwrap_or(SwitchForkDecision::SameFork)
}
#[allow(clippy::too_many_arguments)]
@ -933,13 +1039,7 @@ impl Tower {
}
pub fn is_stray_last_vote(&self) -> bool {
if let Some(last_voted_slot) = self.last_voted_slot() {
if let Some(stray_restored_slot) = self.stray_restored_slot {
return stray_restored_slot == last_voted_slot;
}
}
false
self.stray_restored_slot == self.last_voted_slot()
}
// The tower root can be older/newer if the validator booted from a newer/older snapshot, so
@ -961,8 +1061,10 @@ impl Tower {
assert_eq!(slot_history.check(replayed_root), Check::Found);
assert!(
self.last_vote == Vote::default() && self.vote_state.votes.is_empty()
|| self.last_vote != Vote::default() && !self.vote_state.votes.is_empty(),
self.last_vote == VoteTransaction::from(VoteStateUpdate::default())
&& self.vote_state.votes.is_empty()
|| self.last_vote != VoteTransaction::from(VoteStateUpdate::default())
&& !self.vote_state.votes.is_empty(),
"last vote: {:?} vote_state.votes: {:?}",
self.last_vote,
self.vote_state.votes
@ -1116,7 +1218,7 @@ impl Tower {
info!("All restored votes were behind; resetting root_slot and last_vote in tower!");
// we might not have banks for those votes so just reset.
// That's because the votes may well past replayed_root
self.last_vote = Vote::default();
self.last_vote = VoteTransaction::from(Vote::default());
} else {
info!(
"{} restored votes (out of {}) were on different fork or are upcoming votes on unrooted slots: {:?}!",
@ -1125,11 +1227,8 @@ impl Tower {
self.voted_slots()
);
assert_eq!(
self.last_vote.slots.last().unwrap(),
self.voted_slots().last().unwrap()
);
self.stray_restored_slot = Some(*self.last_vote.slots.last().unwrap());
assert_eq!(self.last_voted_slot(), self.voted_slots().last().copied());
self.stray_restored_slot = self.last_vote.last_voted_slot()
}
Ok(())
@ -1176,13 +1275,12 @@ impl Tower {
pub fn save(&self, tower_storage: &dyn TowerStorage, node_keypair: &Keypair) -> Result<()> {
let saved_tower = SavedTower::new(self, node_keypair)?;
tower_storage.store(&saved_tower)?;
tower_storage.store(&SavedTowerVersions::from(saved_tower))?;
Ok(())
}
pub fn restore(tower_storage: &dyn TowerStorage, node_pubkey: &Pubkey) -> Result<Self> {
let saved_tower = tower_storage.load(node_pubkey)?;
saved_tower.try_into_tower(node_pubkey)
tower_storage.load(node_pubkey)
}
}
@ -1290,7 +1388,7 @@ pub mod test {
},
solana_vote_program::vote_state::{Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY},
std::{
collections::HashMap,
collections::{HashMap, VecDeque},
fs::{remove_file, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
path::PathBuf,
@ -1331,17 +1429,29 @@ pub mod test {
let vote = Vote::default();
let mut decision = SwitchForkDecision::FailedSwitchThreshold(0, 1);
assert!(decision
.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default())
.to_vote_instruction(
VoteTransaction::from(vote.clone()),
&Pubkey::default(),
&Pubkey::default()
)
.is_none());
decision = SwitchForkDecision::FailedSwitchDuplicateRollback(0);
assert!(decision
.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default())
.to_vote_instruction(
VoteTransaction::from(vote.clone()),
&Pubkey::default(),
&Pubkey::default()
)
.is_none());
decision = SwitchForkDecision::SameFork;
assert_eq!(
decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()),
decision.to_vote_instruction(
VoteTransaction::from(vote.clone()),
&Pubkey::default(),
&Pubkey::default()
),
Some(vote_instruction::vote(
&Pubkey::default(),
&Pubkey::default(),
@ -1351,7 +1461,11 @@ pub mod test {
decision = SwitchForkDecision::SwitchProof(Hash::default());
assert_eq!(
decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()),
decision.to_vote_instruction(
VoteTransaction::from(vote.clone()),
&Pubkey::default(),
&Pubkey::default()
),
Some(vote_instruction::vote_switch(
&Pubkey::default(),
&Pubkey::default(),
@ -1373,7 +1487,7 @@ pub mod test {
// Set the voting behavior
let mut cluster_votes = HashMap::new();
let votes = vec![0, 1, 2, 3, 4, 5];
let votes = vec![1, 2, 3, 4, 5];
cluster_votes.insert(node_pubkey, votes.clone());
vote_simulator.fill_bank_forks(forks, &cluster_votes, true);
@ -1384,9 +1498,12 @@ pub mod test {
.is_empty());
}
for i in 0..5 {
assert_eq!(tower.vote_state.votes[i].slot as usize, i);
assert_eq!(tower.vote_state.votes[i].confirmation_count as usize, 6 - i);
for i in 1..5 {
assert_eq!(tower.vote_state.votes[i - 1].slot as usize, i);
assert_eq!(
tower.vote_state.votes[i - 1].confirmation_count as usize,
6 - i
);
}
}
@ -1846,7 +1963,7 @@ pub mod test {
/ (tr(44)
// Minor fork 2
/ (tr(45) / (tr(46))))
/ (tr(110)))));
/ (tr(110)))));
// Have two validators, each representing 20% of the stake vote on
// minor fork 2 at slots 46 + 47
@ -1918,7 +2035,7 @@ pub mod test {
let mut my_votes: Vec<Slot> = vec![];
let next_unlocked_slot = 110;
// Vote on the first minor fork
my_votes.extend(0..=14);
my_votes.extend(1..=14);
// Come back to the main fork
my_votes.extend(43..=44);
// Vote on the second minor fork
@ -2107,8 +2224,8 @@ pub mod test {
#[test]
fn test_is_locked_out_empty() {
let tower = Tower::new_for_tests(0, 0.67);
let ancestors = HashSet::new();
assert!(!tower.is_locked_out(0, &ancestors));
let ancestors = HashSet::from([0]);
assert!(!tower.is_locked_out(1, &ancestors));
}
#[test]
@ -2139,7 +2256,7 @@ pub mod test {
#[test]
fn test_check_recent_slot() {
let mut tower = Tower::new_for_tests(0, 0.67);
assert!(tower.is_recent(0));
assert!(tower.is_recent(1));
assert!(tower.is_recent(32));
for i in 0..64 {
tower.record_vote(i, Hash::default());
@ -2254,7 +2371,7 @@ pub mod test {
let mut local = VoteState::default();
let vote = Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), None);
assert_eq!(local.votes.len(), 1);
assert_eq!(vote.slots, vec![0]);
assert_eq!(vote.slots(), vec![0]);
assert_eq!(local.tower(), vec![0]);
}
@ -2265,7 +2382,7 @@ pub mod test {
// another vote for slot 0 should return an empty vote as the diff.
let vote =
Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), Some(0));
assert!(vote.slots.is_empty());
assert!(vote.is_empty());
}
#[test]
@ -2280,7 +2397,7 @@ pub mod test {
assert_eq!(local.votes.len(), 1);
let vote =
Tower::apply_vote_and_generate_vote_diff(&mut local, 1, Hash::default(), Some(0));
assert_eq!(vote.slots, vec![1]);
assert_eq!(vote.slots(), vec![1]);
assert_eq!(local.tower(), vec![0, 1]);
}
@ -2300,7 +2417,7 @@ pub mod test {
// observable in any of the results.
let vote =
Tower::apply_vote_and_generate_vote_diff(&mut local, 3, Hash::default(), Some(0));
assert_eq!(vote.slots, vec![3]);
assert_eq!(vote.slots(), vec![3]);
assert_eq!(local.tower(), vec![3]);
}
@ -2373,17 +2490,26 @@ pub mod test {
fn vote_and_check_recent(num_votes: usize) {
let mut tower = Tower::new_for_tests(1, 0.67);
let slots = if num_votes > 0 {
vec![num_votes as u64 - 1]
{ 0..num_votes }
.map(|i| Lockout {
slot: i as u64,
confirmation_count: (num_votes as u32) - (i as u32),
})
.collect()
} else {
vec![]
};
let mut expected = Vote::new(slots, Hash::default());
let mut expected = VoteStateUpdate::new(
VecDeque::from(slots),
if num_votes > 0 { Some(0) } else { None },
Hash::default(),
);
for i in 0..num_votes {
tower.record_vote(i as u64, Hash::default());
}
expected.timestamp = tower.last_vote.timestamp;
assert_eq!(expected, tower.last_vote)
expected.timestamp = tower.last_vote.timestamp();
assert_eq!(VoteTransaction::from(expected), tower.last_vote)
}
#[test]
@ -2683,10 +2809,12 @@ pub mod test {
.write(true)
.open(path)
.unwrap();
// 4 is the offset into SavedTowerVersions for the signature
assert_eq!(file.seek(SeekFrom::Start(4)).unwrap(), 4);
let mut buf = [0u8];
assert_eq!(file.read(&mut buf).unwrap(), 1);
buf[0] = !buf[0];
assert_eq!(file.seek(SeekFrom::Start(0)).unwrap(), 0);
assert_eq!(file.seek(SeekFrom::Start(4)).unwrap(), 4);
assert_eq!(file.write(&buf).unwrap(), 1);
},
);
@ -3025,7 +3153,7 @@ pub mod test {
tower.vote_state.votes.push_back(Lockout::new(1));
tower.vote_state.votes.push_back(Lockout::new(0));
let vote = Vote::new(vec![0], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
let mut slot_history = SlotHistory::default();
slot_history.add(0);
@ -3043,7 +3171,7 @@ pub mod test {
tower.vote_state.votes.push_back(Lockout::new(1));
tower.vote_state.votes.push_back(Lockout::new(2));
let vote = Vote::new(vec![2], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
let mut slot_history = SlotHistory::default();
slot_history.add(0);
@ -3068,7 +3196,7 @@ pub mod test {
tower.vote_state.votes.push_back(Lockout::new(0));
tower.vote_state.votes.push_back(Lockout::new(1));
let vote = Vote::new(vec![1], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
let mut slot_history = SlotHistory::default();
slot_history.add(MAX_ENTRIES);
@ -3087,7 +3215,7 @@ pub mod test {
tower.vote_state.votes.push_back(Lockout::new(2));
tower.vote_state.votes.push_back(Lockout::new(1));
let vote = Vote::new(vec![1], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
let mut slot_history = SlotHistory::default();
slot_history.add(0);
@ -3106,7 +3234,7 @@ pub mod test {
tower.vote_state.votes.push_back(Lockout::new(3));
tower.vote_state.votes.push_back(Lockout::new(3));
let vote = Vote::new(vec![3], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
let mut slot_history = SlotHistory::default();
slot_history.add(0);
@ -3125,7 +3253,7 @@ pub mod test {
tower.vote_state.votes.push_back(Lockout::new(43));
tower.vote_state.votes.push_back(Lockout::new(44));
let vote = Vote::new(vec![44], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
let mut slot_history = SlotHistory::default();
slot_history.add(42);
@ -3139,7 +3267,7 @@ pub mod test {
let mut tower = Tower::new_for_tests(10, 0.9);
tower.vote_state.votes.push_back(Lockout::new(0));
let vote = Vote::new(vec![0], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
let mut slot_history = SlotHistory::default();
slot_history.add(0);
@ -3153,7 +3281,7 @@ pub mod test {
tower.vote_state.votes.push_back(Lockout::new(13));
tower.vote_state.votes.push_back(Lockout::new(14));
let vote = Vote::new(vec![14], Hash::default());
tower.last_vote = vote;
tower.last_vote = VoteTransaction::from(vote);
tower.initialize_root(12);
let mut slot_history = SlotHistory::default();

View File

@ -54,6 +54,7 @@ pub mod sigverify_stage;
pub mod snapshot_packager_service;
pub mod stats_reporter_service;
pub mod system_monitor_service;
mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
pub mod tree_diff;

View File

@ -21,7 +21,7 @@ use {
progress_map::{ForkProgress, ProgressMap, PropagatedStats},
repair_service::DuplicateSlotsResetReceiver,
rewards_recorder_service::RewardsRecorderSender,
tower_storage::{SavedTower, TowerStorage},
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
@ -64,7 +64,7 @@ use {
timing::timestamp,
transaction::Transaction,
},
solana_vote_program::vote_state::Vote,
solana_vote_program::vote_state::VoteTransaction,
std::{
collections::{HashMap, HashSet},
result,
@ -425,6 +425,7 @@ impl ReplayStage {
last_refresh_time: Instant::now(),
last_print_time: Instant::now(),
};
loop {
// Stop getting entries if we get exit signal
if exit.load(Ordering::Relaxed) {
@ -561,7 +562,7 @@ impl ReplayStage {
&vote_account,
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut progress,
&vote_tracker,
&cluster_slots,
@ -1823,7 +1824,7 @@ impl ReplayStage {
bank: &Bank,
vote_account_pubkey: &Pubkey,
authorized_voter_keypairs: &[Arc<Keypair>],
vote: Vote,
vote: VoteTransaction,
switch_fork_decision: &SwitchForkDecision,
vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: bool,
@ -2026,7 +2027,7 @@ impl ReplayStage {
.send(VoteOp::PushVote {
tx: vote_tx,
tower_slots,
saved_tower,
saved_tower: SavedTowerVersions::from(saved_tower),
})
.unwrap_or_else(|err| warn!("Error: {:?}", err));
}
@ -2299,7 +2300,7 @@ impl ReplayStage {
my_vote_pubkey: &Pubkey,
ancestors: &HashMap<u64, HashSet<u64>>,
frozen_banks: &mut Vec<Arc<Bank>>,
tower: &Tower,
tower: &mut Tower,
progress: &mut ProgressMap,
vote_tracker: &VoteTracker,
cluster_slots: &ClusterSlots,
@ -2320,6 +2321,70 @@ impl ReplayStage {
.expect("All frozen banks must exist in the Progress map")
.computed;
if !is_computed {
// Check if our tower is behind, if so (and the feature migration flag is in use)
// overwrite with the newer bank.
if let (true, Some((_, vote_account))) = (
Tower::is_direct_vote_state_update_enabled(bank),
bank.get_vote_account(my_vote_pubkey),
) {
if let Some(mut bank_vote_state) =
vote_account.vote_state().as_ref().ok().cloned()
{
if bank_vote_state.last_voted_slot()
> tower.vote_state.last_voted_slot()
{
info!(
"Frozen bank vote state slot {:?}
is newer than our local vote state slot {:?},
adopting the bank vote state as our own.
Bank votes: {:?}, root: {:?},
Local votes: {:?}, root: {:?}",
bank_vote_state.last_voted_slot(),
tower.vote_state.last_voted_slot(),
bank_vote_state.votes,
bank_vote_state.root_slot,
tower.vote_state.votes,
tower.vote_state.root_slot
);
if let Some(local_root) = tower.vote_state.root_slot {
if bank_vote_state
.root_slot
.map(|bank_root| local_root > bank_root)
.unwrap_or(true)
{
// If the local root is larger than this on chain vote state
// root (possible due to supermajority roots being set on
// startup), then we need to adjust the tower
bank_vote_state.root_slot = Some(local_root);
bank_vote_state
.votes
.retain(|lockout| lockout.slot > local_root);
info!(
"Local root is larger than on chain root,
overwrote bank root {:?} and updated votes {:?}",
bank_vote_state.root_slot, bank_vote_state.votes
);
if let Some(first_vote) = bank_vote_state.votes.front() {
assert!(ancestors
.get(&first_vote.slot)
.expect(
"Ancestors map must contain an
entry for all slots on this fork
greater than `local_root` and less
than `bank_slot`"
)
.contains(&local_root));
}
}
}
tower.vote_state.root_slot = bank_vote_state.root_slot;
tower.vote_state.votes = bank_vote_state.votes;
}
}
}
let computed_bank_state = Tower::collect_vote_lockouts(
my_vote_pubkey,
bank_slot,
@ -3990,12 +4055,12 @@ pub mod tests {
.values()
.cloned()
.collect();
let tower = Tower::new_for_tests(0, 0.67);
let mut tower = Tower::new_for_tests(0, 0.67);
let newly_computed = ReplayStage::compute_bank_stats(
&my_vote_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut progress,
&VoteTracker::default(),
&ClusterSlots::default(),
@ -4039,7 +4104,7 @@ pub mod tests {
&my_vote_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut progress,
&VoteTracker::default(),
&ClusterSlots::default(),
@ -4075,7 +4140,7 @@ pub mod tests {
&my_vote_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut progress,
&VoteTracker::default(),
&ClusterSlots::default(),
@ -4091,7 +4156,7 @@ pub mod tests {
fn test_same_weight_select_lower_slot() {
// Init state
let mut vote_simulator = VoteSimulator::new(1);
let tower = Tower::default();
let mut tower = Tower::default();
// Create the tree of banks in a BankForks object
let forks = tr(0) / (tr(1)) / (tr(2));
@ -4114,7 +4179,7 @@ pub mod tests {
&my_vote_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut vote_simulator.progress,
&VoteTracker::default(),
&ClusterSlots::default(),
@ -4170,7 +4235,7 @@ pub mod tests {
// Set the voting behavior
let mut cluster_votes = HashMap::new();
let votes = vec![0, 2];
let votes = vec![2];
cluster_votes.insert(my_node_pubkey, votes.clone());
vote_simulator.fill_bank_forks(forks, &cluster_votes, true);
@ -4195,7 +4260,7 @@ pub mod tests {
&my_vote_pubkey,
&vote_simulator.bank_forks.read().unwrap().ancestors(),
&mut frozen_banks,
&tower,
&mut tower,
&mut vote_simulator.progress,
&VoteTracker::default(),
&ClusterSlots::default(),
@ -5110,12 +5175,12 @@ pub mod tests {
);
// Update propagation status
let tower = Tower::new_for_tests(0, 0.67);
let mut tower = Tower::new_for_tests(0, 0.67);
ReplayStage::compute_bank_stats(
&validator_node_to_vote_keys[&my_pubkey],
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut progress,
&vote_tracker,
&ClusterSlots::default(),
@ -5498,7 +5563,7 @@ pub mod tests {
&Pubkey::new_unique(),
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut progress,
&VoteTracker::default(),
&ClusterSlots::default(),
@ -5625,7 +5690,7 @@ pub mod tests {
&Pubkey::new_unique(),
&ancestors,
&mut frozen_banks,
&tower,
&mut tower,
&mut progress,
&VoteTracker::default(),
&ClusterSlots::default(),

64
core/src/tower1_7_14.rs Normal file
View File

@ -0,0 +1,64 @@
use crate::consensus::{SwitchForkDecision, TowerError};
use solana_sdk::{
clock::Slot,
hash::Hash,
pubkey::Pubkey,
signature::{Signature, Signer},
};
use solana_vote_program::vote_state::{BlockTimestamp, Vote, VoteState};
#[frozen_abi(digest = "7phMrqmBo2D3rXPdhBj8CpjRvvmx9qgpcU4cDGkL3W9q")]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct Tower1_7_14 {
pub(crate) node_pubkey: Pubkey,
pub(crate) threshold_depth: usize,
pub(crate) threshold_size: f64,
pub(crate) vote_state: VoteState,
pub(crate) last_vote: Vote,
#[serde(skip)]
// The blockhash used in the last vote transaction, may or may not equal the
// blockhash of the voted block itself, depending if the vote slot was refreshed.
// For instance, a vote for slot 5, may be refreshed/resubmitted for inclusion in
// block 10, in which case `last_vote_tx_blockhash` equals the blockhash of 10, not 5.
pub(crate) last_vote_tx_blockhash: Hash,
pub(crate) last_timestamp: BlockTimestamp,
#[serde(skip)]
// Restored last voted slot which cannot be found in SlotHistory at replayed root
// (This is a special field for slashing-free validator restart with edge cases).
// This could be emptied after some time; but left intact indefinitely for easier
// implementation
// Further, stray slot can be stale or not. `Stale` here means whether given
// bank_forks (=~ ledger) lacks the slot or not.
pub(crate) stray_restored_slot: Option<Slot>,
#[serde(skip)]
pub(crate) last_switch_threshold_check: Option<(Slot, SwitchForkDecision)>,
}
#[frozen_abi(digest = "CxwFFxKfn6ez6wifDKr5WYr3eu2PsWUKdMYp3LX8Xj52")]
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct SavedTower1_7_14 {
pub(crate) signature: Signature,
pub(crate) data: Vec<u8>,
#[serde(skip)]
pub(crate) node_pubkey: Pubkey,
}
impl SavedTower1_7_14 {
pub fn new<T: Signer>(tower: &Tower1_7_14, keypair: &T) -> Result<Self, TowerError> {
let node_pubkey = keypair.pubkey();
if tower.node_pubkey != node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}
let data = bincode::serialize(tower)?;
let signature = keypair.sign_message(&data);
Ok(Self {
signature,
data,
node_pubkey,
})
}
}

View File

@ -1,5 +1,6 @@
use {
crate::consensus::{Result, Tower, TowerError},
crate::consensus::{Result, Tower, TowerError, TowerVersions},
crate::tower1_7_14::SavedTower1_7_14,
solana_sdk::{
pubkey::Pubkey,
signature::{Signature, Signer},
@ -12,6 +13,67 @@ use {
},
};
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub enum SavedTowerVersions {
V1_17_14(SavedTower1_7_14),
Current(SavedTower),
}
impl SavedTowerVersions {
fn try_into_tower(&self, node_pubkey: &Pubkey) -> Result<Tower> {
// This method assumes that `self` was just deserialized
assert_eq!(self.pubkey(), Pubkey::default());
let tv = match self {
SavedTowerVersions::V1_17_14(t) => {
if !t.signature.verify(node_pubkey.as_ref(), &t.data) {
return Err(TowerError::InvalidSignature);
}
bincode::deserialize(&t.data).map(TowerVersions::V1_17_14)
}
SavedTowerVersions::Current(t) => {
if !t.signature.verify(node_pubkey.as_ref(), &t.data) {
return Err(TowerError::InvalidSignature);
}
bincode::deserialize(&t.data).map(TowerVersions::Current)
}
};
tv.map_err(|e| e.into()).and_then(|tv: TowerVersions| {
let tower = tv.convert_to_current();
if tower.node_pubkey != *node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}
Ok(tower)
})
}
fn serialize_into(&self, file: &mut File) -> Result<()> {
bincode::serialize_into(file, self).map_err(|e| e.into())
}
fn pubkey(&self) -> Pubkey {
match self {
SavedTowerVersions::V1_17_14(t) => t.node_pubkey,
SavedTowerVersions::Current(t) => t.node_pubkey,
}
}
}
impl From<SavedTower> for SavedTowerVersions {
fn from(tower: SavedTower) -> SavedTowerVersions {
SavedTowerVersions::Current(tower)
}
}
impl From<SavedTower1_7_14> for SavedTowerVersions {
fn from(tower: SavedTower1_7_14) -> SavedTowerVersions {
SavedTowerVersions::V1_17_14(tower)
}
}
#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")]
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct SavedTower {
@ -39,45 +101,25 @@ impl SavedTower {
node_pubkey,
})
}
pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result<Tower> {
// This method assumes that `self` was just deserialized
assert_eq!(self.node_pubkey, Pubkey::default());
if !self.signature.verify(node_pubkey.as_ref(), &self.data) {
return Err(TowerError::InvalidSignature);
}
bincode::deserialize(&self.data)
.map_err(|e| e.into())
.and_then(|tower: Tower| {
if tower.node_pubkey != *node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}
Ok(tower)
})
}
}
pub trait TowerStorage: Sync + Send {
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower>;
fn store(&self, saved_tower: &SavedTower) -> Result<()>;
fn load(&self, node_pubkey: &Pubkey) -> Result<Tower>;
fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()>;
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct NullTowerStorage {}
impl TowerStorage for NullTowerStorage {
fn load(&self, _node_pubkey: &Pubkey) -> Result<SavedTower> {
fn load(&self, _node_pubkey: &Pubkey) -> Result<Tower> {
Err(TowerError::IoError(io::Error::new(
io::ErrorKind::Other,
"NullTowerStorage::load() not available",
)))
}
fn store(&self, _saved_tower: &SavedTower) -> Result<()> {
fn store(&self, _saved_tower: &SavedTowerVersions) -> Result<()> {
Ok(())
}
}
@ -92,35 +134,75 @@ impl FileTowerStorage {
Self { tower_path }
}
pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf {
// Old filename for towers pre 1.9 (VoteStateUpdate)
pub fn old_filename(&self, node_pubkey: &Pubkey) -> PathBuf {
self.tower_path
.join(format!("tower-{}", node_pubkey))
.with_extension("bin")
}
pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf {
self.tower_path
.join(format!("tower-1_9-{}", node_pubkey))
.with_extension("bin")
}
#[cfg(test)]
fn store_old(&self, saved_tower: &SavedTower1_7_14) -> Result<()> {
let pubkey = saved_tower.node_pubkey;
let filename = self.old_filename(&pubkey);
trace!("store: {}", filename.display());
let new_filename = filename.with_extension("bin.new");
{
// overwrite anything if exists
let file = File::create(&new_filename)?;
bincode::serialize_into(file, saved_tower)?;
// file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
}
fs::rename(&new_filename, &filename)?;
// self.path.parent().sync_all() hurts performance same as the above sync
Ok(())
}
}
impl TowerStorage for FileTowerStorage {
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower> {
fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> {
let filename = self.filename(node_pubkey);
trace!("load {}", filename.display());
// Ensure to create parent dir here, because restore() precedes save() always
fs::create_dir_all(&filename.parent().unwrap())?;
let file = File::open(&filename)?;
let mut stream = BufReader::new(file);
bincode::deserialize_from(&mut stream).map_err(|e| e.into())
if let Ok(file) = File::open(&filename) {
// New format
let mut stream = BufReader::new(file);
bincode::deserialize_from(&mut stream)
.map_err(|e| e.into())
.and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey))
} else {
// Old format
let file = File::open(&self.old_filename(node_pubkey))?;
let mut stream = BufReader::new(file);
bincode::deserialize_from(&mut stream)
.map_err(|e| e.into())
.and_then(|t: SavedTower1_7_14| {
SavedTowerVersions::from(t).try_into_tower(node_pubkey)
})
}
}
fn store(&self, saved_tower: &SavedTower) -> Result<()> {
let filename = self.filename(&saved_tower.node_pubkey);
fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> {
let pubkey = saved_tower.pubkey();
let filename = self.filename(&pubkey);
trace!("store: {}", filename.display());
let new_filename = filename.with_extension("bin.new");
{
// overwrite anything if exists
let mut file = File::create(&new_filename)?;
bincode::serialize_into(&mut file, saved_tower)?;
saved_tower.serialize_into(&mut file)?;
// file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
}
fs::rename(&new_filename, &filename)?;
@ -194,7 +276,7 @@ impl EtcdTowerStorage {
}
impl TowerStorage for EtcdTowerStorage {
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower> {
fn load(&self, node_pubkey: &Pubkey) -> Result<Tower> {
let (instance_key, tower_key) = Self::get_keys(node_pubkey);
let mut client = self.client.write().unwrap();
@ -236,7 +318,9 @@ impl TowerStorage for EtcdTowerStorage {
for op_response in response.op_responses() {
if let etcd_client::TxnOpResponse::Get(get_response) = op_response {
if let Some(kv) = get_response.kvs().get(0) {
return bincode::deserialize_from(kv.value()).map_err(|e| e.into());
return bincode::deserialize_from(kv.value())
.map_err(|e| e.into())
.and_then(|t: SavedTowerVersions| t.try_into_tower(node_pubkey));
}
}
}
@ -248,8 +332,8 @@ impl TowerStorage for EtcdTowerStorage {
)))
}
fn store(&self, saved_tower: &SavedTower) -> Result<()> {
let (instance_key, tower_key) = Self::get_keys(&saved_tower.node_pubkey);
fn store(&self, saved_tower: &SavedTowerVersions) -> Result<()> {
let (instance_key, tower_key) = Self::get_keys(&saved_tower.pubkey());
let mut client = self.client.write().unwrap();
let txn = etcd_client::Txn::new()
@ -260,7 +344,7 @@ impl TowerStorage for EtcdTowerStorage {
)])
.and_then(vec![etcd_client::TxnOp::put(
tower_key,
bincode::serialize(saved_tower)?,
bincode::serialize(&saved_tower)?,
None,
)]);
@ -276,9 +360,63 @@ impl TowerStorage for EtcdTowerStorage {
if !response.succeeded() {
return Err(TowerError::IoError(io::Error::new(
io::ErrorKind::Other,
format!("Lost etcd instance lock for {}", saved_tower.node_pubkey),
format!("Lost etcd instance lock for {}", saved_tower.pubkey()),
)));
}
Ok(())
}
}
#[cfg(test)]
pub mod test {
use {
super::*,
crate::{
consensus::Tower,
tower1_7_14::{SavedTower1_7_14, Tower1_7_14},
},
solana_sdk::{hash::Hash, signature::Keypair},
solana_vote_program::vote_state::{
BlockTimestamp, Lockout, Vote, VoteState, VoteTransaction, MAX_LOCKOUT_HISTORY,
},
tempfile::TempDir,
};
#[test]
fn test_tower_migration() {
let tower_path = TempDir::new().unwrap();
let identity_keypair = Keypair::new();
let node_pubkey = identity_keypair.pubkey();
let mut vote_state = VoteState::default();
vote_state
.votes
.resize(MAX_LOCKOUT_HISTORY, Lockout::default());
vote_state.root_slot = Some(1);
let vote = Vote::new(vec![1, 2, 3, 4], Hash::default());
let tower_storage = FileTowerStorage::new(tower_path.path().to_path_buf());
let old_tower = Tower1_7_14 {
node_pubkey,
threshold_depth: 10,
threshold_size: 0.9,
vote_state,
last_vote: vote.clone(),
last_timestamp: BlockTimestamp::default(),
last_vote_tx_blockhash: Hash::default(),
stray_restored_slot: Some(2),
last_switch_threshold_check: Option::default(),
};
{
let saved_tower = SavedTower1_7_14::new(&old_tower, &identity_keypair).unwrap();
tower_storage.store_old(&saved_tower).unwrap();
}
let loaded = Tower::restore(&tower_storage, &node_pubkey).unwrap();
assert_eq!(loaded.node_pubkey, old_tower.node_pubkey);
assert_eq!(loaded.last_vote(), VoteTransaction::from(vote));
assert_eq!(loaded.vote_state.root_slot, Some(1));
assert_eq!(loaded.stray_restored_slot(), None);
}
}

View File

@ -1,5 +1,5 @@
use {
crate::tower_storage::{SavedTower, TowerStorage},
crate::tower_storage::{SavedTowerVersions, TowerStorage},
crossbeam_channel::Receiver,
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
@ -16,7 +16,7 @@ pub enum VoteOp {
PushVote {
tx: Transaction,
tower_slots: Vec<Slot>,
saved_tower: SavedTower,
saved_tower: SavedTowerVersions,
},
RefreshVote {
tx: Transaction,

View File

@ -5,7 +5,7 @@ use {
solana_core::{
broadcast_stage::BroadcastStageType,
consensus::{Tower, SWITCH_FORK_THRESHOLD},
tower_storage::FileTowerStorage,
tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage},
validator::ValidatorConfig,
},
solana_gossip::gossip_service::discover_cluster,
@ -406,3 +406,15 @@ pub fn test_faulty_node(
(cluster, validator_keys)
}
pub fn save_tower(tower_path: &Path, tower: &Tower, node_keypair: &Keypair) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
let saved_tower = SavedTower::new(tower, node_keypair).unwrap();
file_tower_storage
.store(&SavedTowerVersions::from(saved_tower))
.unwrap();
}
pub fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Slot> {
restore_tower(tower_path, node_pubkey).map(|tower| tower.root())
}

View File

@ -3,8 +3,8 @@ use {
assert_matches::assert_matches,
common::{
copy_blocks, create_custom_leader_schedule, last_vote_in_tower, ms_for_n_slots,
open_blockstore, purge_slots, remove_tower, restore_tower, run_cluster_partition,
run_kill_partition_switch_threshold, test_faulty_node,
open_blockstore, purge_slots, remove_tower, restore_tower, root_in_tower,
run_cluster_partition, run_kill_partition_switch_threshold, save_tower, test_faulty_node,
wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER,
},
crossbeam_channel::{unbounded, Receiver},
@ -23,7 +23,7 @@ use {
consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH},
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
replay_stage::DUPLICATE_THRESHOLD,
tower_storage::{FileTowerStorage, SavedTower, TowerStorage},
tower_storage::FileTowerStorage,
validator::ValidatorConfig,
},
solana_download_utils::download_snapshot_archive,
@ -1913,16 +1913,6 @@ fn test_validator_saves_tower() {
assert!(tower4.root() >= new_root);
}
fn save_tower(tower_path: &Path, tower: &Tower, node_keypair: &Keypair) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
let saved_tower = SavedTower::new(tower, node_keypair).unwrap();
file_tower_storage.store(&saved_tower).unwrap();
}
fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Slot> {
restore_tower(tower_path, node_pubkey).map(|tower| tower.root())
}
// This test verifies that even if votes from a validator end up taking too long to land, and thus
// some of the referenced slots are slots are no longer present in the slot hashes sysvar,
// consensus can still be attained.
@ -2318,6 +2308,14 @@ fn test_hard_fork_invalidates_tower() {
validator_b_info.config.wait_for_supermajority = Some(hard_fork_slot);
validator_b_info.config.expected_shred_version = Some(expected_shred_version);
// Clear ledger of all slots post hard fork
{
let blockstore_a = open_blockstore(&validator_a_info.info.ledger_path);
let blockstore_b = open_blockstore(&validator_b_info.info.ledger_path);
purge_slots(&blockstore_a, hard_fork_slot + 1, 100);
purge_slots(&blockstore_b, hard_fork_slot + 1, 100);
}
// restart validator A first
let cluster_for_a = cluster.clone();
// Spawn a thread because wait_for_supermajority blocks in Validator::new()!
@ -2374,91 +2372,6 @@ fn test_run_test_load_program_accounts_root() {
run_test_load_program_accounts(CommitmentConfig::finalized());
}
#[test]
#[serial]
fn test_restart_tower_rollback() {
// Test node crashing and failing to save its tower before restart
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 4 nodes
let slots_per_epoch = 2048;
let node_stakes = vec![10000, 1];
let validator_strings = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
];
let validator_b_keypair = Arc::new(Keypair::from_base58_string(validator_strings[1]));
let validator_b_pubkey = validator_b_keypair.pubkey();
let validator_keys = validator_strings
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.take(node_stakes.len())
.collect::<Vec<_>>();
let mut config = ClusterConfig {
cluster_lamports: 100_000,
node_stakes: node_stakes.clone(),
validator_configs: make_identical_validator_configs(
&ValidatorConfig::default_for_test(),
node_stakes.len(),
),
validator_keys: Some(validator_keys),
slots_per_epoch,
stakers_slot_offset: slots_per_epoch,
skip_warmup_slots: true,
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey);
let mut earlier_tower: Tower;
loop {
sleep(Duration::from_millis(1000));
// Grab the current saved tower
earlier_tower = restore_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap();
if earlier_tower.last_voted_slot().unwrap_or(0) > 1 {
break;
}
}
let exited_validator_info: ClusterValidatorInfo;
loop {
sleep(Duration::from_millis(1000));
// Wait for second, lesser staked validator to make a root past the earlier_tower's
// latest vote slot, then exit that validator
if let Some(root) = root_in_tower(&val_b_ledger_path, &validator_b_pubkey) {
if root
> earlier_tower
.last_voted_slot()
.expect("Earlier tower must have at least one vote")
{
exited_validator_info = cluster.exit_node(&validator_b_pubkey);
break;
}
}
}
// Now rewrite the tower with the *earlier_tower*
save_tower(&val_b_ledger_path, &earlier_tower, &validator_b_keypair);
cluster.restart_node(
&validator_b_pubkey,
exited_validator_info,
SocketAddrSpace::Unspecified,
);
// Check this node is making new roots
cluster.check_for_new_roots(
20,
"test_restart_tower_rollback",
SocketAddrSpace::Unspecified,
);
}
#[test]
#[serial]
fn test_run_test_load_program_accounts_partition_root() {

View File

@ -3,19 +3,19 @@
#![allow(clippy::integer_arithmetic)]
use {
common::{
copy_blocks, last_vote_in_tower, open_blockstore, purge_slots, remove_tower,
wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER,
copy_blocks, last_vote_in_tower, open_blockstore, purge_slots, remove_tower, restore_tower,
root_in_tower, save_tower, wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER,
},
log::*,
serial_test::serial,
solana_core::validator::ValidatorConfig,
solana_core::{consensus::Tower, validator::ValidatorConfig},
solana_ledger::{
ancestor_iterator::AncestorIterator,
blockstore::Blockstore,
blockstore_db::{AccessType, BlockstoreOptions},
},
solana_local_cluster::{
cluster::Cluster,
cluster::{Cluster, ClusterValidatorInfo},
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::*,
},
@ -358,3 +358,89 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
info!("THIS TEST expected no violation. And indeed, there was none, thanks to persisted tower.");
}
}
#[test]
#[serial]
#[ignore]
fn test_restart_tower_rollback() {
// Test node crashing and failing to save its tower before restart
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 4 nodes
let slots_per_epoch = 2048;
let node_stakes = vec![10000, 1];
let validator_strings = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
];
let validator_b_keypair = Arc::new(Keypair::from_base58_string(validator_strings[1]));
let validator_b_pubkey = validator_b_keypair.pubkey();
let validator_keys = validator_strings
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.take(node_stakes.len())
.collect::<Vec<_>>();
let mut config = ClusterConfig {
cluster_lamports: 100_000,
node_stakes: node_stakes.clone(),
validator_configs: make_identical_validator_configs(
&ValidatorConfig::default_for_test(),
node_stakes.len(),
),
validator_keys: Some(validator_keys),
slots_per_epoch,
stakers_slot_offset: slots_per_epoch,
skip_warmup_slots: true,
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey);
let mut earlier_tower: Tower;
loop {
sleep(Duration::from_millis(1000));
// Grab the current saved tower
earlier_tower = restore_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap();
if earlier_tower.last_voted_slot().unwrap_or(0) > 1 {
break;
}
}
let exited_validator_info: ClusterValidatorInfo;
loop {
sleep(Duration::from_millis(1000));
// Wait for second, lesser staked validator to make a root past the earlier_tower's
// latest vote slot, then exit that validator
if let Some(root) = root_in_tower(&val_b_ledger_path, &validator_b_pubkey) {
if root
> earlier_tower
.last_voted_slot()
.expect("Earlier tower must have at least one vote")
{
exited_validator_info = cluster.exit_node(&validator_b_pubkey);
break;
}
}
}
// Now rewrite the tower with the *earlier_tower*
save_tower(&val_b_ledger_path, &earlier_tower, &validator_b_keypair);
cluster.restart_node(
&validator_b_pubkey,
exited_validator_info,
SocketAddrSpace::Unspecified,
);
// Check this node is making new roots
cluster.check_for_new_roots(
20,
"test_restart_tower_rollback",
SocketAddrSpace::Unspecified,
);
}

View File

@ -61,6 +61,9 @@ pub enum VoteError {
#[error("every slot in the vote was older than the SlotHashes history")]
VotesTooOldAllFiltered,
#[error("Proposed root is not in slot hashes")]
RootOnDifferentFork,
}
impl<E> DecodeError<E> for VoteError {

View File

@ -20,7 +20,6 @@ use {
sysvar::clock::Clock,
},
std::{
boxed::Box,
cmp::Ordering,
collections::{HashSet, VecDeque},
fmt::Debug,
@ -41,6 +40,93 @@ pub const MAX_EPOCH_CREDITS_HISTORY: usize = 64;
// Offset of VoteState::prior_voters, for determining initialization status without deserialization
const DEFAULT_PRIOR_VOTERS_OFFSET: usize = 82;
#[frozen_abi(digest = "6LBwH5w3WyAWZhsM3KTG9QZP7nYBhcC61K33kHR6gMAD")]
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, AbiEnumVisitor, AbiExample)]
pub enum VoteTransaction {
Vote(Vote),
VoteStateUpdate(VoteStateUpdate),
}
impl VoteTransaction {
pub fn slots(&self) -> Vec<Slot> {
match self {
VoteTransaction::Vote(vote) => vote.slots.clone(),
VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.slots(),
}
}
pub fn slot(&self, i: usize) -> Slot {
match self {
VoteTransaction::Vote(vote) => vote.slots[i],
VoteTransaction::VoteStateUpdate(vote_state_update) => {
vote_state_update.lockouts[i].slot
}
}
}
pub fn len(&self) -> usize {
match self {
VoteTransaction::Vote(vote) => vote.slots.len(),
VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.lockouts.len(),
}
}
pub fn is_empty(&self) -> bool {
match self {
VoteTransaction::Vote(vote) => vote.slots.is_empty(),
VoteTransaction::VoteStateUpdate(vote_state_update) => {
vote_state_update.lockouts.is_empty()
}
}
}
pub fn hash(&self) -> Hash {
match self {
VoteTransaction::Vote(vote) => vote.hash,
VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.hash,
}
}
pub fn timestamp(&self) -> Option<UnixTimestamp> {
match self {
VoteTransaction::Vote(vote) => vote.timestamp,
VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.timestamp,
}
}
pub fn set_timestamp(&mut self, ts: Option<UnixTimestamp>) {
match self {
VoteTransaction::Vote(vote) => vote.timestamp = ts,
VoteTransaction::VoteStateUpdate(vote_state_update) => vote_state_update.timestamp = ts,
}
}
pub fn last_voted_slot(&self) -> Option<Slot> {
match self {
VoteTransaction::Vote(vote) => vote.slots.last().copied(),
VoteTransaction::VoteStateUpdate(vote_state_update) => {
Some(vote_state_update.lockouts.back()?.slot)
}
}
}
pub fn last_voted_slot_hash(&self) -> Option<(Slot, Hash)> {
Some((self.last_voted_slot()?, self.hash()))
}
}
impl From<Vote> for VoteTransaction {
fn from(vote: Vote) -> Self {
VoteTransaction::Vote(vote)
}
}
impl From<VoteStateUpdate> for VoteTransaction {
fn from(vote_state_update: VoteStateUpdate) -> Self {
VoteTransaction::VoteStateUpdate(vote_state_update)
}
}
#[frozen_abi(digest = "Ch2vVEwos2EjAVqSHCyJjnN2MNX1yrpapZTGhMSCjWUH")]
#[derive(Serialize, Default, Deserialize, Debug, PartialEq, Eq, Clone, AbiExample)]
pub struct Vote {
@ -93,6 +179,7 @@ impl Lockout {
}
}
#[frozen_abi(digest = "BctadFJjUKbvPJzr6TszbX6rBfQUNSRKpKKngkzgXgeY")]
#[derive(Serialize, Default, Deserialize, Debug, PartialEq, Eq, Clone, AbiExample)]
pub struct VoteStateUpdate {
/// The proposed tower
@ -132,6 +219,10 @@ impl VoteStateUpdate {
timestamp: None,
}
}
pub fn slots(&self) -> Vec<Slot> {
self.lockouts.iter().map(|lockout| lockout.slot).collect()
}
}
#[derive(Default, Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)]
@ -382,6 +473,12 @@ impl VoteState {
// to the current vote state root for safety.
if earliest_slot_hash_in_history > new_proposed_root {
vote_state_update.root = self.root_slot;
} else if !slot_hashes
// Verify that the root is in slot hashes
.iter()
.any(|&(slot, _)| slot == new_proposed_root)
{
return Err(VoteError::RootOnDifferentFork);
}
}
@ -779,7 +876,6 @@ impl VoteState {
if vote.slots.is_empty() {
return Err(VoteError::EmptySlots);
}
let filtered_vote_slots = feature_set.and_then(|feature_set| {
if feature_set.is_active(&filter_votes_outside_slot_hashes::id()) {
let earliest_slot_in_history =
@ -1537,8 +1633,10 @@ mod tests {
let versioned = VoteStateVersions::new_current(vote_state);
assert!(VoteState::serialize(&versioned, &mut buffer[0..4]).is_err());
VoteState::serialize(&versioned, &mut buffer).unwrap();
let des = VoteState::deserialize(&buffer).unwrap();
assert_eq!(des, versioned.convert_to_current(),);
assert_eq!(
VoteState::deserialize(&buffer).unwrap(),
versioned.convert_to_current()
);
}
#[test]