Distinguish switch/non-switching votes in ReplayStage (#10218)

* Add SwitchForkDecision, change vote instruction based on decision

* Factor out SelectVoteAndResetForkResult

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-05-29 14:40:36 -07:00 committed by GitHub
parent 284e83e619
commit 778078e1dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 244 additions and 108 deletions

View File

@ -9,10 +9,14 @@ use solana_sdk::{
account::Account,
clock::{Slot, UnixTimestamp},
hash::Hash,
instruction::Instruction,
pubkey::Pubkey,
};
use solana_vote_program::vote_state::{
BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL,
use solana_vote_program::{
vote_instruction,
vote_state::{
BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL,
},
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
@ -20,6 +24,39 @@ use std::{
sync::Arc,
};
#[derive(PartialEq, Clone, Debug)]
pub enum SwitchForkDecision {
SwitchProof(Hash),
NoSwitch,
FailedSwitchThreshold,
}
impl SwitchForkDecision {
pub fn to_vote_instruction(
&self,
vote: Vote,
vote_account_pubkey: &Pubkey,
authorized_voter_pubkey: &Pubkey,
) -> Option<Instruction> {
match self {
SwitchForkDecision::FailedSwitchThreshold => None,
SwitchForkDecision::NoSwitch => Some(vote_instruction::vote(
vote_account_pubkey,
authorized_voter_pubkey,
vote,
)),
SwitchForkDecision::SwitchProof(switch_proof_hash) => {
Some(vote_instruction::vote_switch(
vote_account_pubkey,
authorized_voter_pubkey,
vote,
*switch_proof_hash,
))
}
}
}
}
pub const VOTE_THRESHOLD_DEPTH: usize = 8;
pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64;
pub const SWITCH_FORK_THRESHOLD: f64 = 0.38;
@ -345,7 +382,7 @@ impl Tower {
progress: &ProgressMap,
total_stake: u64,
epoch_vote_accounts: &HashMap<Pubkey, (u64, Account)>,
) -> bool {
) -> SwitchForkDecision {
self.last_vote()
.slots
.last()
@ -355,14 +392,18 @@ impl Tower {
if switch_slot == *last_vote || switch_slot_ancestors.contains(last_vote) {
// If the `switch_slot is a descendant of the last vote,
// no switching proof is neceessary
return true;
// no switching proof is necessary
return SwitchForkDecision::NoSwitch;
}
// Should never consider switching to an ancestor
// of your last vote
assert!(!last_vote_ancestors.contains(&switch_slot));
// By this point, we know the `switch_slot` is on a different fork
// (is neither an ancestor nor descendant of `last_vote`), so a
// switching proof is necessary
let switch_proof = Hash::default();
let mut locked_out_stake = 0;
let mut locked_out_vote_accounts = HashSet::new();
for (candidate_slot, descendants) in descendants.iter() {
@ -423,9 +464,14 @@ impl Tower {
}
}
}
(locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD
if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD {
SwitchForkDecision::SwitchProof(switch_proof)
} else {
SwitchForkDecision::FailedSwitchThreshold
}
})
.unwrap_or(true)
.unwrap_or(SwitchForkDecision::NoSwitch)
}
pub fn check_vote_stake_threshold(
@ -583,7 +629,7 @@ pub mod test {
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
progress_map::ForkProgress,
replay_stage::{HeaviestForkFailures, ReplayStage},
replay_stage::{HeaviestForkFailures, ReplayStage, SelectVoteAndResetForkResult},
};
use solana_ledger::bank_forks::BankForks;
use solana_runtime::{
@ -716,7 +762,10 @@ pub mod test {
// Try to vote on the given slot
let descendants = self.bank_forks.read().unwrap().descendants();
let (_, _, failure_reasons) = ReplayStage::select_vote_and_reset_forks(
let SelectVoteAndResetForkResult {
heaviest_fork_failures,
..
} = ReplayStage::select_vote_and_reset_forks(
&Some(vote_bank.clone()),
&None,
&ancestors,
@ -727,8 +776,8 @@ pub mod test {
// Make sure this slot isn't locked out or failing threshold
info!("Checking vote: {}", vote_bank.slot());
if !failure_reasons.is_empty() {
return failure_reasons;
if !heaviest_fork_failures.is_empty() {
return heaviest_fork_failures;
}
let vote = tower.new_vote_from_bank(&vote_bank, &my_vote_pubkey).0;
if let Some(new_root) = tower.record_bank_vote(vote) {
@ -905,6 +954,34 @@ pub mod test {
stakes
}
#[test]
fn test_to_vote_instruction() {
let vote = Vote::default();
let mut decision = SwitchForkDecision::FailedSwitchThreshold;
assert!(decision
.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default())
.is_none());
decision = SwitchForkDecision::NoSwitch;
assert_eq!(
decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()),
Some(vote_instruction::vote(
&Pubkey::default(),
&Pubkey::default(),
vote.clone(),
))
);
decision = SwitchForkDecision::SwitchProof(Hash::default());
assert_eq!(
decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()),
Some(vote_instruction::vote_switch(
&Pubkey::default(),
&Pubkey::default(),
vote,
Hash::default()
))
);
}
#[test]
fn test_simple_votes() {
// Init state
@ -975,85 +1052,106 @@ pub mod test {
tower.record_vote(47, Hash::default());
// Trying to switch to a descendant of last vote should always work
assert!(tower.check_switch_threshold(
48,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
));
assert_eq!(
tower.check_switch_threshold(
48,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::NoSwitch
);
// Trying to switch to another fork at 110 should fail
assert!(!tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
));
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
);
// Adding another validator lockout on a descendant of last vote should
// not count toward the switch threshold
vote_simulator.simulate_lockout_interval(50, (49, 100), &other_vote_account);
assert!(!tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
));
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
);
// Adding another validator lockout on an ancestor of last vote should
// not count toward the switch threshold
vote_simulator.simulate_lockout_interval(50, (45, 100), &other_vote_account);
assert!(!tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
));
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
);
// Adding another validator lockout on a different fork, but the lockout
// doesn't cover the last vote, should not satisfy the switch threshold
vote_simulator.simulate_lockout_interval(14, (12, 46), &other_vote_account);
assert!(!tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
));
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
);
// Adding another validator lockout on a different fork, and the lockout
// covers the last vote, should satisfy the switch threshold
vote_simulator.simulate_lockout_interval(14, (12, 47), &other_vote_account);
assert!(tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
));
assert_eq!(
tower.check_switch_threshold(
110,
&ancestors,
&descendants,
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::SwitchProof(Hash::default())
);
// If we set a root, then any lockout intervals below the root shouldn't
// count toward the switch threshold. This means the other validator's
// vote lockout no longer counts
vote_simulator.set_root(43);
assert!(!tower.check_switch_threshold(
110,
&vote_simulator.bank_forks.read().unwrap().ancestors(),
&vote_simulator.bank_forks.read().unwrap().descendants(),
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
));
assert_eq!(
tower.check_switch_threshold(
110,
&vote_simulator.bank_forks.read().unwrap().ancestors(),
&vote_simulator.bank_forks.read().unwrap().descendants(),
&vote_simulator.progress,
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
);
}
#[test]

View File

@ -6,7 +6,7 @@ use crate::{
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData},
consensus::{StakeLockout, Tower},
consensus::{StakeLockout, SwitchForkDecision, Tower},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
progress_map::{ForkProgress, ForkStats, ProgressMap, PropagatedStats},
pubkey_references::PubkeyReferences,
@ -55,6 +55,7 @@ use std::{
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
pub const UNLOCK_SWITCH_VOTE_SLOT: Slot = 5_000_000;
#[derive(PartialEq, Debug)]
pub(crate) enum HeaviestForkFailures {
@ -139,6 +140,12 @@ impl ReplayTiming {
}
}
pub(crate) struct SelectVoteAndResetForkResult {
pub vote_bank: Option<(Arc<Bank>, SwitchForkDecision)>,
pub reset_bank: Option<Arc<Bank>>,
pub heaviest_fork_failures: Vec<HeaviestForkFailures>,
}
pub struct ReplayStage {
t_replay: JoinHandle<Result<()>>,
commitment_service: AggregateCommitmentService,
@ -316,15 +323,18 @@ impl ReplayStage {
Self::report_memory(&allocated, "select_fork", start);
let now = Instant::now();
let (vote_bank, reset_bank, failure_reasons) =
Self::select_vote_and_reset_forks(
&heaviest_bank,
&heaviest_bank_on_same_fork,
&ancestors,
&descendants,
&progress,
&tower,
);
let SelectVoteAndResetForkResult {
vote_bank,
reset_bank,
heaviest_fork_failures,
} = Self::select_vote_and_reset_forks(
&heaviest_bank,
&heaviest_bank_on_same_fork,
&ancestors,
&descendants,
&progress,
&tower,
);
let select_vote_and_reset_forks_elapsed = now.elapsed().as_micros();
replay_timing.update(
compute_bank_stats_elapsed as u64,
@ -333,15 +343,15 @@ impl ReplayStage {
if heaviest_bank.is_some()
&& tower.is_recent(heaviest_bank.as_ref().unwrap().slot())
&& !failure_reasons.is_empty()
&& !heaviest_fork_failures.is_empty()
{
info!(
"Couldn't vote on heaviest fork: {:?}, failure_reasons: {:?}",
"Couldn't vote on heaviest fork: {:?}, heaviest_fork_failures: {:?}",
heaviest_bank.as_ref().map(|b| b.slot()),
failure_reasons
heaviest_fork_failures
);
for r in failure_reasons {
for r in heaviest_fork_failures {
if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r {
if let Some(latest_leader_slot) =
progress.get_latest_leader_slot(slot)
@ -355,7 +365,7 @@ impl ReplayStage {
let start = allocated.get();
// Vote on a fork
if let Some(ref vote_bank) = vote_bank {
if let Some((ref vote_bank, ref switch_fork_decision)) = vote_bank {
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank))
{
@ -369,6 +379,7 @@ impl ReplayStage {
Self::handle_votable_bank(
&vote_bank,
switch_fork_decision,
&bank_forks,
&mut tower,
&mut progress,
@ -394,7 +405,10 @@ impl ReplayStage {
if last_reset != reset_bank.last_blockhash() {
info!(
"vote bank: {:?} reset bank: {:?}",
vote_bank.as_ref().map(|b| b.slot()),
vote_bank.as_ref().map(|(b, switch_fork_decision)| (
b.slot(),
switch_fork_decision
)),
reset_bank.slot(),
);
let fork_progress = progress
@ -420,7 +434,8 @@ impl ReplayStage {
tpu_has_bank = false;
if !partition
&& vote_bank.as_ref().map(|b| b.slot()) != Some(reset_bank.slot())
&& vote_bank.as_ref().map(|(b, _)| b.slot())
!= Some(reset_bank.slot())
{
warn!(
"PARTITION DETECTED waiting to join fork: {} last vote: {:?}",
@ -434,7 +449,8 @@ impl ReplayStage {
);
partition = true;
} else if partition
&& vote_bank.as_ref().map(|b| b.slot()) == Some(reset_bank.slot())
&& vote_bank.as_ref().map(|(b, _)| b.slot())
== Some(reset_bank.slot())
{
warn!(
"PARTITION resolved fork: {} last vote: {:?}",
@ -841,6 +857,7 @@ impl ReplayStage {
#[allow(clippy::too_many_arguments)]
fn handle_votable_bank(
bank: &Arc<Bank>,
switch_fork_decision: &SwitchForkDecision,
bank_forks: &Arc<RwLock<BankForks>>,
tower: &mut Tower,
progress: &mut ProgressMap,
@ -917,6 +934,7 @@ impl ReplayStage {
authorized_voter_keypairs,
tower.last_vote_and_timestamp(),
tower_index,
switch_fork_decision,
);
Ok(())
}
@ -928,6 +946,7 @@ impl ReplayStage {
authorized_voter_keypairs: &[Arc<Keypair>],
vote: Vote,
tower_index: usize,
switch_fork_decision: &SwitchForkDecision,
) {
if authorized_voter_keypairs.is_empty() {
return;
@ -978,11 +997,21 @@ impl ReplayStage {
let node_keypair = cluster_info.keypair.clone();
// Send our last few votes along with the new one
let vote_ix = vote_instruction::vote(
&vote_account_pubkey,
&authorized_voter_keypair.pubkey(),
vote,
);
let vote_ix = if bank.slot() > UNLOCK_SWITCH_VOTE_SLOT {
switch_fork_decision
.to_vote_instruction(
vote,
&vote_account_pubkey,
&authorized_voter_keypair.pubkey(),
)
.expect("Switch threshold failure should not lead to voting")
} else {
vote_instruction::vote(
&vote_account_pubkey,
&authorized_voter_keypair.pubkey(),
vote,
)
};
let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey()));
@ -1389,11 +1418,7 @@ impl ReplayStage {
descendants: &HashMap<u64, HashSet<u64>>,
progress: &ProgressMap,
tower: &Tower,
) -> (
Option<Arc<Bank>>,
Option<Arc<Bank>>,
Vec<HeaviestForkFailures>,
) {
) -> SelectVoteAndResetForkResult {
// Try to vote on the actual heaviest fork. If the heaviest bank is
// locked out or fails the threshold check, the validator will:
// 1) Not continue to vote on current fork, waiting for lockouts to expire/
@ -1410,7 +1435,7 @@ impl ReplayStage {
let mut failure_reasons = vec![];
let selected_fork = {
if let Some(bank) = heaviest_bank {
let switch_threshold = tower.check_switch_threshold(
let switch_fork_decision = tower.check_switch_threshold(
bank.slot(),
&ancestors,
&descendants,
@ -1420,30 +1445,30 @@ impl ReplayStage {
"Bank epoch vote accounts must contain entry for the bank's own epoch",
),
);
if !switch_threshold {
if switch_fork_decision == SwitchForkDecision::FailedSwitchThreshold {
// If we can't switch, then reset to the the next votable
// bank on the same fork as our last vote, but don't vote
info!(
"Waiting to switch to {}, voting on {:?} on same fork for now",
"Waiting to switch vote to {}, resetting to slot {:?} on same fork for now",
bank.slot(),
heaviest_bank_on_same_fork.as_ref().map(|b| b.slot())
);
failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot()));
heaviest_bank_on_same_fork
.as_ref()
.map(|b| (b, switch_threshold))
.map(|b| (b, switch_fork_decision))
} else {
// If the switch threshold is observed, halt voting on
// the current fork and attempt to vote/reset Poh to
// the heaviest bank
heaviest_bank.as_ref().map(|b| (b, switch_threshold))
heaviest_bank.as_ref().map(|b| (b, switch_fork_decision))
}
} else {
None
}
};
if let Some((bank, switch_threshold)) = selected_fork {
if let Some((bank, switch_fork_decision)) = selected_fork {
let (is_locked_out, vote_threshold, is_leader_slot, fork_weight) = {
let fork_stats = progress.get_fork_stats(bank.slot()).unwrap();
let propagated_stats = &progress.get_propagated_stats(bank.slot()).unwrap();
@ -1466,18 +1491,31 @@ impl ReplayStage {
if !propagation_confirmed {
failure_reasons.push(HeaviestForkFailures::NoPropagatedConfirmation(bank.slot()));
}
if !switch_threshold {
failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot()));
}
if !is_locked_out && vote_threshold && propagation_confirmed && switch_threshold {
if !is_locked_out
&& vote_threshold
&& propagation_confirmed
&& switch_fork_decision != SwitchForkDecision::FailedSwitchThreshold
{
info!("voting: {} {}", bank.slot(), fork_weight);
(Some(bank.clone()), Some(bank.clone()), failure_reasons)
SelectVoteAndResetForkResult {
vote_bank: Some((bank.clone(), switch_fork_decision)),
reset_bank: Some(bank.clone()),
heaviest_fork_failures: failure_reasons,
}
} else {
(None, Some(bank.clone()), failure_reasons)
SelectVoteAndResetForkResult {
vote_bank: None,
reset_bank: Some(bank.clone()),
heaviest_fork_failures: failure_reasons,
}
}
} else {
(None, None, failure_reasons)
SelectVoteAndResetForkResult {
vote_bank: None,
reset_bank: None,
heaviest_fork_failures: failure_reasons,
}
}
}