Add ReplayStage changes for checking switch threshold (#8504)
* Refactor for supporting switch threshold check
This commit is contained in:
parent
8dc4724340
commit
8ef8c9094a
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::replay_stage::ProgressMap;
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use solana_ledger::bank_forks::BankForks;
|
use solana_ledger::bank_forks::BankForks;
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
|
@ -355,6 +356,17 @@ impl Tower {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn check_switch_threshold(
|
||||||
|
&self,
|
||||||
|
_slot: u64,
|
||||||
|
_ancestors: &HashMap<Slot, HashSet<u64>>,
|
||||||
|
_descendants: &HashMap<Slot, HashSet<u64>>,
|
||||||
|
_progress: &ProgressMap,
|
||||||
|
_total_stake: u64,
|
||||||
|
) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
/// Update lockouts for all the ancestors
|
/// Update lockouts for all the ancestors
|
||||||
fn update_ancestor_lockouts(
|
fn update_ancestor_lockouts(
|
||||||
stake_lockouts: &mut HashMap<Slot, StakeLockout>,
|
stake_lockouts: &mut HashMap<Slot, StakeLockout>,
|
||||||
|
@ -468,7 +480,7 @@ impl Tower {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test {
|
pub mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::replay_stage::{ForkProgress, ReplayStage};
|
use crate::replay_stage::{ForkProgress, HeaviestForkFailures, ReplayStage};
|
||||||
use solana_ledger::bank_forks::BankForks;
|
use solana_ledger::bank_forks::BankForks;
|
||||||
use solana_runtime::{
|
use solana_runtime::{
|
||||||
bank::Bank,
|
bank::Bank,
|
||||||
|
@ -513,7 +525,7 @@ pub mod test {
|
||||||
my_keypairs: &ValidatorVoteKeypairs,
|
my_keypairs: &ValidatorVoteKeypairs,
|
||||||
progress: &mut HashMap<u64, ForkProgress>,
|
progress: &mut HashMap<u64, ForkProgress>,
|
||||||
tower: &mut Tower,
|
tower: &mut Tower,
|
||||||
) -> Vec<VoteFailures> {
|
) -> Vec<HeaviestForkFailures> {
|
||||||
let node = self
|
let node = self
|
||||||
.find_node_and_update_simulation(vote_slot)
|
.find_node_and_update_simulation(vote_slot)
|
||||||
.expect("Vote to simulate must be for a slot in the tree");
|
.expect("Vote to simulate must be for a slot in the tree");
|
||||||
|
@ -611,17 +623,17 @@ pub mod test {
|
||||||
info!("lockouts: {:?}", fork_progress.fork_stats.stake_lockouts);
|
info!("lockouts: {:?}", fork_progress.fork_stats.stake_lockouts);
|
||||||
let mut failures = vec![];
|
let mut failures = vec![];
|
||||||
if fork_progress.fork_stats.is_locked_out {
|
if fork_progress.fork_stats.is_locked_out {
|
||||||
failures.push(VoteFailures::LockedOut(vote_slot));
|
failures.push(HeaviestForkFailures::LockedOut(vote_slot));
|
||||||
}
|
}
|
||||||
if !fork_progress.fork_stats.vote_threshold {
|
if !fork_progress.fork_stats.vote_threshold {
|
||||||
failures.push(VoteFailures::FailedThreshold(vote_slot));
|
failures.push(HeaviestForkFailures::FailedThreshold(vote_slot));
|
||||||
}
|
}
|
||||||
if !failures.is_empty() {
|
if !failures.is_empty() {
|
||||||
return failures;
|
return failures;
|
||||||
}
|
}
|
||||||
let vote = tower.new_vote_from_bank(&bank, &my_vote_pubkey).0;
|
let vote = tower.new_vote_from_bank(&bank, &my_vote_pubkey).0;
|
||||||
if let Some(new_root) = tower.record_bank_vote(vote) {
|
if let Some(new_root) = tower.record_bank_vote(vote) {
|
||||||
ReplayStage::handle_new_root(new_root, bank_forks, progress, &None);
|
ReplayStage::handle_new_root(new_root, bank_forks, progress, &None, &mut 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark the vote for this bank under this node's pubkey so it will be
|
// Mark the vote for this bank under this node's pubkey so it will be
|
||||||
|
@ -671,12 +683,6 @@ pub mod test {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Debug)]
|
|
||||||
pub(crate) enum VoteFailures {
|
|
||||||
LockedOut(u64),
|
|
||||||
FailedThreshold(u64),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup BankForks with bank 0 and all the validator accounts
|
// Setup BankForks with bank 0 and all the validator accounts
|
||||||
pub(crate) fn initialize_state(
|
pub(crate) fn initialize_state(
|
||||||
validator_keypairs_map: &HashMap<Pubkey, ValidatorVoteKeypairs>,
|
validator_keypairs_map: &HashMap<Pubkey, ValidatorVoteKeypairs>,
|
||||||
|
|
|
@ -45,6 +45,14 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
|
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
|
||||||
|
pub(crate) type ProgressMap = HashMap<Slot, ForkProgress>;
|
||||||
|
|
||||||
|
#[derive(PartialEq, Debug)]
|
||||||
|
pub(crate) enum HeaviestForkFailures {
|
||||||
|
LockedOut(u64),
|
||||||
|
FailedThreshold(u64),
|
||||||
|
FailedSwitchThreshold(u64),
|
||||||
|
}
|
||||||
|
|
||||||
// Implement a destructor for the ReplayStage thread to signal it exited
|
// Implement a destructor for the ReplayStage thread to signal it exited
|
||||||
// even on panics
|
// even on panics
|
||||||
|
@ -207,6 +215,11 @@ impl ReplayStage {
|
||||||
let mut current_leader = None;
|
let mut current_leader = None;
|
||||||
let mut last_reset = Hash::default();
|
let mut last_reset = Hash::default();
|
||||||
let mut partition = false;
|
let mut partition = false;
|
||||||
|
let mut earliest_vote_on_fork = {
|
||||||
|
let slots = tower.last_vote().slots;
|
||||||
|
slots.last().cloned().unwrap_or(0)
|
||||||
|
};
|
||||||
|
let mut switch_threshold = false;
|
||||||
loop {
|
loop {
|
||||||
let allocated = thread_mem_usage::Allocatedp::default();
|
let allocated = thread_mem_usage::Allocatedp::default();
|
||||||
|
|
||||||
|
@ -242,6 +255,7 @@ impl ReplayStage {
|
||||||
Self::report_memory(&allocated, "replay_active_banks", start);
|
Self::report_memory(&allocated, "replay_active_banks", start);
|
||||||
|
|
||||||
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
|
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
|
||||||
|
let descendants = Arc::new(HashMap::new());
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
let mut frozen_banks: Vec<_> = bank_forks
|
let mut frozen_banks: Vec<_> = bank_forks
|
||||||
.read()
|
.read()
|
||||||
|
@ -276,95 +290,148 @@ impl ReplayStage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let vote_bank = Self::select_fork(&frozen_banks, &tower, &progress);
|
let (heaviest_bank, votable_bank_on_same_fork) =
|
||||||
|
Self::select_forks(&frozen_banks, &tower, &progress, &ancestors);
|
||||||
|
|
||||||
Self::report_memory(&allocated, "select_fork", start);
|
Self::report_memory(&allocated, "select_fork", start);
|
||||||
if vote_bank.is_none() {
|
|
||||||
break;
|
let (vote_bank, reset_bank, failure_reasons) =
|
||||||
|
Self::select_vote_and_reset_forks(
|
||||||
|
&heaviest_bank,
|
||||||
|
&votable_bank_on_same_fork,
|
||||||
|
earliest_vote_on_fork,
|
||||||
|
&mut switch_threshold,
|
||||||
|
&ancestors,
|
||||||
|
&descendants,
|
||||||
|
&progress,
|
||||||
|
&tower,
|
||||||
|
);
|
||||||
|
|
||||||
|
if heaviest_bank.is_some()
|
||||||
|
&& tower.is_recent(heaviest_bank.as_ref().unwrap().slot())
|
||||||
|
&& !failure_reasons.is_empty()
|
||||||
|
{
|
||||||
|
info!(
|
||||||
|
"Couldn't vote on heaviest fork: {:?}, failure_reasons: {:?}",
|
||||||
|
heaviest_bank.as_ref().map(|b| b.slot()),
|
||||||
|
failure_reasons
|
||||||
|
);
|
||||||
}
|
}
|
||||||
let bank = vote_bank.unwrap();
|
|
||||||
let (is_locked_out, vote_threshold, fork_weight, total_staked) = {
|
|
||||||
let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats;
|
|
||||||
(
|
|
||||||
fork_stats.is_locked_out,
|
|
||||||
fork_stats.vote_threshold,
|
|
||||||
fork_stats.weight,
|
|
||||||
fork_stats.total_staked,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
let mut vote_bank_slot = None;
|
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
if !is_locked_out && vote_threshold {
|
|
||||||
info!("voting: {} {}", bank.slot(), fork_weight);
|
// Vote on a fork
|
||||||
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
|
let voted_on_different_fork = {
|
||||||
if let Some(votable_leader) =
|
if let Some(ref vote_bank) = vote_bank {
|
||||||
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
|
subscriptions.notify_subscribers(vote_bank.slot(), &bank_forks);
|
||||||
{
|
if let Some(votable_leader) = leader_schedule_cache
|
||||||
Self::log_leader_change(
|
.slot_leader_at(vote_bank.slot(), Some(vote_bank))
|
||||||
&my_pubkey,
|
{
|
||||||
bank.slot(),
|
Self::log_leader_change(
|
||||||
&mut current_leader,
|
&my_pubkey,
|
||||||
&votable_leader,
|
vote_bank.slot(),
|
||||||
);
|
&mut current_leader,
|
||||||
|
&votable_leader,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::handle_votable_bank(
|
||||||
|
&vote_bank,
|
||||||
|
&bank_forks,
|
||||||
|
&mut tower,
|
||||||
|
&mut progress,
|
||||||
|
&vote_account,
|
||||||
|
&voting_keypair,
|
||||||
|
&cluster_info,
|
||||||
|
&blockstore,
|
||||||
|
&leader_schedule_cache,
|
||||||
|
&root_bank_sender,
|
||||||
|
&lockouts_sender,
|
||||||
|
&snapshot_package_sender,
|
||||||
|
&latest_root_senders,
|
||||||
|
&mut earliest_vote_on_fork,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
ancestors
|
||||||
|
.get(&vote_bank.slot())
|
||||||
|
.unwrap()
|
||||||
|
.contains(&earliest_vote_on_fork)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
}
|
}
|
||||||
vote_bank_slot = Some(bank.slot());
|
};
|
||||||
Self::handle_votable_bank(
|
|
||||||
&bank,
|
|
||||||
&bank_forks,
|
|
||||||
&mut tower,
|
|
||||||
&mut progress,
|
|
||||||
&vote_account,
|
|
||||||
&voting_keypair,
|
|
||||||
&cluster_info,
|
|
||||||
&blockstore,
|
|
||||||
&leader_schedule_cache,
|
|
||||||
&root_bank_sender,
|
|
||||||
total_staked,
|
|
||||||
&lockouts_sender,
|
|
||||||
&snapshot_package_sender,
|
|
||||||
&latest_root_senders,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
Self::report_memory(&allocated, "votable_bank", start);
|
Self::report_memory(&allocated, "votable_bank", start);
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
if last_reset != bank.last_blockhash() {
|
|
||||||
Self::reset_poh_recorder(
|
// Reset onto a fork
|
||||||
&my_pubkey,
|
if let Some(reset_bank) = reset_bank {
|
||||||
&blockstore,
|
let selected_same_fork = ancestors
|
||||||
&bank,
|
.get(&reset_bank.slot())
|
||||||
&poh_recorder,
|
.unwrap()
|
||||||
&leader_schedule_cache,
|
.contains(&earliest_vote_on_fork);
|
||||||
);
|
if last_reset != reset_bank.last_blockhash()
|
||||||
last_reset = bank.last_blockhash();
|
&& (selected_same_fork || switch_threshold)
|
||||||
tpu_has_bank = false;
|
{
|
||||||
info!(
|
info!(
|
||||||
"vote bank: {:?} reset bank: {}",
|
"vote bank: {:?} reset bank: {:?}",
|
||||||
vote_bank_slot,
|
vote_bank.as_ref().map(|b| b.slot()),
|
||||||
bank.slot()
|
reset_bank.slot(),
|
||||||
);
|
|
||||||
if !partition && vote_bank_slot != Some(bank.slot()) {
|
|
||||||
warn!(
|
|
||||||
"PARTITION DETECTED waiting to join fork: {} last vote: {:?}",
|
|
||||||
bank.slot(),
|
|
||||||
tower.last_vote()
|
|
||||||
);
|
);
|
||||||
inc_new_counter_info!("replay_stage-partition_detected", 1);
|
Self::reset_poh_recorder(
|
||||||
datapoint_info!(
|
&my_pubkey,
|
||||||
"replay_stage-partition",
|
&blockstore,
|
||||||
("slot", bank.slot() as i64, i64)
|
&reset_bank,
|
||||||
|
&poh_recorder,
|
||||||
|
&leader_schedule_cache,
|
||||||
);
|
);
|
||||||
partition = true;
|
last_reset = reset_bank.last_blockhash();
|
||||||
} else if partition && vote_bank_slot == Some(bank.slot()) {
|
tpu_has_bank = false;
|
||||||
warn!(
|
|
||||||
"PARTITION resolved fork: {} last vote: {:?}",
|
if !partition
|
||||||
bank.slot(),
|
&& vote_bank.as_ref().map(|b| b.slot()) != Some(reset_bank.slot())
|
||||||
tower.last_vote()
|
{
|
||||||
);
|
warn!(
|
||||||
partition = false;
|
"PARTITION DETECTED waiting to join fork: {} last vote: {:?}",
|
||||||
inc_new_counter_info!("replay_stage-partition_resolved", 1);
|
reset_bank.slot(),
|
||||||
|
tower.last_vote()
|
||||||
|
);
|
||||||
|
inc_new_counter_info!("replay_stage-partition_detected", 1);
|
||||||
|
datapoint_info!(
|
||||||
|
"replay_stage-partition",
|
||||||
|
("slot", reset_bank.slot() as i64, i64)
|
||||||
|
);
|
||||||
|
partition = true;
|
||||||
|
} else if partition
|
||||||
|
&& vote_bank.as_ref().map(|b| b.slot()) == Some(reset_bank.slot())
|
||||||
|
{
|
||||||
|
warn!(
|
||||||
|
"PARTITION resolved fork: {} last vote: {:?}",
|
||||||
|
reset_bank.slot(),
|
||||||
|
tower.last_vote()
|
||||||
|
);
|
||||||
|
partition = false;
|
||||||
|
inc_new_counter_info!("replay_stage-partition_resolved", 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
datapoint_debug!(
|
||||||
|
"replay_stage-memory",
|
||||||
|
("reset_bank", (allocated.get() - start) as i64, i64),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Self::report_memory(&allocated, "reset_bank", start);
|
Self::report_memory(&allocated, "reset_bank", start);
|
||||||
|
|
||||||
|
// If we voted on a different fork, update the earliest vote
|
||||||
|
// to this slot, clear the switch threshold
|
||||||
|
if voted_on_different_fork {
|
||||||
|
earliest_vote_on_fork = vote_bank
|
||||||
|
.expect("voted_on_different_fork only set if vote_bank.is_some()")
|
||||||
|
.slot();
|
||||||
|
// Clear the thresholds after voting on different
|
||||||
|
// fork
|
||||||
|
switch_threshold = false;
|
||||||
|
}
|
||||||
|
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
if !tpu_has_bank {
|
if !tpu_has_bank {
|
||||||
Self::maybe_start_leader(
|
Self::maybe_start_leader(
|
||||||
|
@ -579,17 +646,17 @@ impl ReplayStage {
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
tower: &mut Tower,
|
tower: &mut Tower,
|
||||||
progress: &mut HashMap<u64, ForkProgress>,
|
progress: &mut ProgressMap,
|
||||||
vote_account: &Pubkey,
|
vote_account: &Pubkey,
|
||||||
voting_keypair: &Option<Arc<Keypair>>,
|
voting_keypair: &Option<Arc<Keypair>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
root_bank_sender: &Sender<Vec<Arc<Bank>>>,
|
root_bank_sender: &Sender<Vec<Arc<Bank>>>,
|
||||||
total_staked: u64,
|
|
||||||
lockouts_sender: &Sender<CommitmentAggregationData>,
|
lockouts_sender: &Sender<CommitmentAggregationData>,
|
||||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||||
latest_root_senders: &[Sender<Slot>],
|
latest_root_senders: &[Sender<Slot>],
|
||||||
|
earliest_vote_on_fork: &mut Slot,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if bank.is_empty() {
|
if bank.is_empty() {
|
||||||
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
||||||
|
@ -615,7 +682,13 @@ impl ReplayStage {
|
||||||
blockstore
|
blockstore
|
||||||
.set_roots(&rooted_slots)
|
.set_roots(&rooted_slots)
|
||||||
.expect("Ledger set roots failed");
|
.expect("Ledger set roots failed");
|
||||||
Self::handle_new_root(new_root, &bank_forks, progress, snapshot_package_sender);
|
Self::handle_new_root(
|
||||||
|
new_root,
|
||||||
|
&bank_forks,
|
||||||
|
progress,
|
||||||
|
snapshot_package_sender,
|
||||||
|
earliest_vote_on_fork,
|
||||||
|
);
|
||||||
latest_root_senders.iter().for_each(|s| {
|
latest_root_senders.iter().for_each(|s| {
|
||||||
if let Err(e) = s.send(new_root) {
|
if let Err(e) = s.send(new_root) {
|
||||||
trace!("latest root send failed: {:?}", e);
|
trace!("latest root send failed: {:?}", e);
|
||||||
|
@ -627,7 +700,12 @@ impl ReplayStage {
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Self::update_commitment_cache(bank.clone(), total_staked, lockouts_sender);
|
|
||||||
|
Self::update_commitment_cache(
|
||||||
|
bank.clone(),
|
||||||
|
progress.get(&bank.slot()).unwrap().fork_stats.total_staked,
|
||||||
|
lockouts_sender,
|
||||||
|
);
|
||||||
|
|
||||||
if let Some(ref voting_keypair) = voting_keypair {
|
if let Some(ref voting_keypair) = voting_keypair {
|
||||||
let node_keypair = cluster_info.read().unwrap().keypair.clone();
|
let node_keypair = cluster_info.read().unwrap().keypair.clone();
|
||||||
|
@ -701,7 +779,7 @@ impl ReplayStage {
|
||||||
blockstore: &Arc<Blockstore>,
|
blockstore: &Arc<Blockstore>,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
progress: &mut HashMap<u64, ForkProgress>,
|
progress: &mut ProgressMap,
|
||||||
slot_full_senders: &[Sender<(u64, Pubkey)>],
|
slot_full_senders: &[Sender<(u64, Pubkey)>],
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
verify_recyclers: &VerifyRecyclers,
|
verify_recyclers: &VerifyRecyclers,
|
||||||
|
@ -722,7 +800,7 @@ impl ReplayStage {
|
||||||
|
|
||||||
// Insert a progress entry even for slots this node is the leader for, so that
|
// Insert a progress entry even for slots this node is the leader for, so that
|
||||||
// 1) confirm_forks can report confirmation, 2) we can cache computations about
|
// 1) confirm_forks can report confirmation, 2) we can cache computations about
|
||||||
// this bank in `select_fork()`
|
// this bank in `select_forks()`
|
||||||
let bank_progress = &mut progress
|
let bank_progress = &mut progress
|
||||||
.entry(bank.slot())
|
.entry(bank.slot())
|
||||||
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
|
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
|
||||||
|
@ -771,10 +849,10 @@ impl ReplayStage {
|
||||||
ancestors: &HashMap<u64, HashSet<u64>>,
|
ancestors: &HashMap<u64, HashSet<u64>>,
|
||||||
frozen_banks: &mut Vec<Arc<Bank>>,
|
frozen_banks: &mut Vec<Arc<Bank>>,
|
||||||
tower: &Tower,
|
tower: &Tower,
|
||||||
progress: &mut HashMap<u64, ForkProgress>,
|
progress: &mut ProgressMap,
|
||||||
) -> Vec<Slot> {
|
) -> Vec<Slot> {
|
||||||
frozen_banks.sort_by_key(|bank| bank.slot());
|
frozen_banks.sort_by_key(|bank| bank.slot());
|
||||||
let mut new_stats = vec![];
|
let new_stats = vec![];
|
||||||
for bank in frozen_banks {
|
for bank in frozen_banks {
|
||||||
// Only time progress map should be missing a bank slot
|
// Only time progress map should be missing a bank slot
|
||||||
// is if this node was the leader for this slot as those banks
|
// is if this node was the leader for this slot as those banks
|
||||||
|
@ -826,16 +904,19 @@ impl ReplayStage {
|
||||||
stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors);
|
stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors);
|
||||||
stats.has_voted = tower.has_voted(bank.slot());
|
stats.has_voted = tower.has_voted(bank.slot());
|
||||||
stats.is_recent = tower.is_recent(bank.slot());
|
stats.is_recent = tower.is_recent(bank.slot());
|
||||||
new_stats.push(stats.slot);
|
|
||||||
}
|
}
|
||||||
new_stats
|
new_stats
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn select_fork(
|
// Returns:
|
||||||
|
// 1) The heaviest bank
|
||||||
|
// 2) The latest votable bank on the same fork as the last vote
|
||||||
|
pub(crate) fn select_forks(
|
||||||
frozen_banks: &[Arc<Bank>],
|
frozen_banks: &[Arc<Bank>],
|
||||||
tower: &Tower,
|
tower: &Tower,
|
||||||
progress: &HashMap<u64, ForkProgress>,
|
progress: &ProgressMap,
|
||||||
) -> Option<Arc<Bank>> {
|
ancestors: &HashMap<u64, HashSet<u64>>,
|
||||||
|
) -> (Option<Arc<Bank>>, Option<Arc<Bank>>) {
|
||||||
let tower_start = Instant::now();
|
let tower_start = Instant::now();
|
||||||
let num_frozen_banks = frozen_banks.len();
|
let num_frozen_banks = frozen_banks.len();
|
||||||
|
|
||||||
|
@ -845,16 +926,37 @@ impl ReplayStage {
|
||||||
.filter(|b| b.slot() < tower.root().unwrap_or(0))
|
.filter(|b| b.slot() < tower.root().unwrap_or(0))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
|
let last_vote = tower.last_vote().slots.last().cloned();
|
||||||
|
let mut last_votable_on_same_fork = None;
|
||||||
let stats: Vec<&ForkStats> = frozen_banks
|
let stats: Vec<&ForkStats> = frozen_banks
|
||||||
.iter()
|
.iter()
|
||||||
.map(|bank| {
|
.map(|bank| {
|
||||||
// Only time progress map should be missing a bank slot
|
// Only time progress map should be missing a bank slot
|
||||||
// is if this node was the leader for this slot as those banks
|
// is if this node was the leader for this slot as those banks
|
||||||
// are not replayed in replay_active_banks()
|
// are not replayed in replay_active_banks()
|
||||||
&progress
|
let stats = &progress
|
||||||
.get(&bank.slot())
|
.get(&bank.slot())
|
||||||
.expect("All frozen banks must exist in the Progress map")
|
.expect("All frozen banks must exist in the Progress map")
|
||||||
.fork_stats
|
.fork_stats;
|
||||||
|
|
||||||
|
if let Some(last_vote) = last_vote {
|
||||||
|
if ancestors
|
||||||
|
.get(&bank.slot())
|
||||||
|
.expect("Entry in frozen banks must exist in ancestors")
|
||||||
|
.contains(&last_vote)
|
||||||
|
&& stats.vote_threshold
|
||||||
|
{
|
||||||
|
// Descendant of last vote cannot be locked out
|
||||||
|
assert!(!stats.is_locked_out);
|
||||||
|
|
||||||
|
// ancestors(slot) should not contain the slot itself,
|
||||||
|
// so we shouldd never get the same bank as the last vote
|
||||||
|
assert_ne!(bank.slot(), last_vote);
|
||||||
|
last_votable_on_same_fork = Some(bank.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stats
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let num_not_recent = stats.iter().filter(|s| !s.is_recent).count();
|
let num_not_recent = stats.iter().filter(|s| !s.is_recent).count();
|
||||||
|
@ -886,7 +988,7 @@ impl ReplayStage {
|
||||||
rv.is_some()
|
rv.is_some()
|
||||||
);
|
);
|
||||||
datapoint_debug!(
|
datapoint_debug!(
|
||||||
"replay_stage-select_fork",
|
"replay_stage-select_forks",
|
||||||
("frozen_banks", num_frozen_banks as i64, i64),
|
("frozen_banks", num_frozen_banks as i64, i64),
|
||||||
("not_recent", num_not_recent as i64, i64),
|
("not_recent", num_not_recent as i64, i64),
|
||||||
("has_voted", num_has_voted as i64, i64),
|
("has_voted", num_has_voted as i64, i64),
|
||||||
|
@ -900,14 +1002,124 @@ impl ReplayStage {
|
||||||
),
|
),
|
||||||
("tower_duration", ms as i64, i64),
|
("tower_duration", ms as i64, i64),
|
||||||
);
|
);
|
||||||
rv.map(|x| x.0.clone())
|
|
||||||
|
(rv.map(|x| x.0.clone()), last_votable_on_same_fork)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Given a heaviest bank, `heaviest_bank` and the next votable bank
|
||||||
|
// `votable_bank_on_same_fork` as the validator's last vote, return
|
||||||
|
// a bank to vote on, a bank to reset to,
|
||||||
|
pub(crate) fn select_vote_and_reset_forks(
|
||||||
|
heaviest_bank: &Option<Arc<Bank>>,
|
||||||
|
votable_bank_on_same_fork: &Option<Arc<Bank>>,
|
||||||
|
earliest_vote_on_fork: u64,
|
||||||
|
switch_threshold: &mut bool,
|
||||||
|
ancestors: &HashMap<u64, HashSet<u64>>,
|
||||||
|
descendants: &HashMap<u64, HashSet<u64>>,
|
||||||
|
progress: &ProgressMap,
|
||||||
|
tower: &Tower,
|
||||||
|
) -> (
|
||||||
|
Option<Arc<Bank>>,
|
||||||
|
Option<Arc<Bank>>,
|
||||||
|
Vec<HeaviestForkFailures>,
|
||||||
|
) {
|
||||||
|
// Try to vote on the actual heaviest fork. If the heaviest bank is
|
||||||
|
// locked out or fails the threshold check, the validator will:
|
||||||
|
// 1) Not continue to vote on current fork, waiting for lockouts to expire/
|
||||||
|
// threshold check to pass
|
||||||
|
// 2) Will reset PoH to heaviest fork in order to make sure the heaviest
|
||||||
|
// fork is propagated
|
||||||
|
// This above behavior should ensure correct voting and resetting PoH
|
||||||
|
// behavior under all cases:
|
||||||
|
// 1) The best "selected" bank is on same fork
|
||||||
|
// 2) The best "selected" bank is on a different fork,
|
||||||
|
// switch_threshold fails
|
||||||
|
// 3) The best "selected" bank is on a different fork,
|
||||||
|
// switch_threshold succceeds
|
||||||
|
let mut failure_reasons = vec![];
|
||||||
|
let selected_fork = {
|
||||||
|
if let Some(bank) = heaviest_bank {
|
||||||
|
let selected_same_fork = ancestors
|
||||||
|
.get(&bank.slot())
|
||||||
|
.unwrap()
|
||||||
|
.contains(&earliest_vote_on_fork);
|
||||||
|
if selected_same_fork {
|
||||||
|
// If the heaviest bank is on the same fork as the last
|
||||||
|
// vote, then there's no need to check the switch threshold.
|
||||||
|
// Just vote for the latest votable bank on the same fork,
|
||||||
|
// which is `votable_bank_on_same_fork`.
|
||||||
|
votable_bank_on_same_fork
|
||||||
|
} else {
|
||||||
|
if !*switch_threshold {
|
||||||
|
let total_staked =
|
||||||
|
progress.get(&bank.slot()).unwrap().fork_stats.total_staked;
|
||||||
|
*switch_threshold = tower.check_switch_threshold(
|
||||||
|
earliest_vote_on_fork,
|
||||||
|
&ancestors,
|
||||||
|
&descendants,
|
||||||
|
&progress,
|
||||||
|
total_staked,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if !*switch_threshold {
|
||||||
|
// If we can't switch, then vote on the the next votable
|
||||||
|
// bank on the same fork as our last vote
|
||||||
|
info!(
|
||||||
|
"Waiting to switch to {}, voting on {:?} on same fork for now",
|
||||||
|
bank.slot(),
|
||||||
|
votable_bank_on_same_fork.as_ref().map(|b| b.slot())
|
||||||
|
);
|
||||||
|
failure_reasons
|
||||||
|
.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot()));
|
||||||
|
votable_bank_on_same_fork
|
||||||
|
} else {
|
||||||
|
// If the switch threshold is observed, halt voting on
|
||||||
|
// the current fork and attempt to vote/reset Poh/switch to
|
||||||
|
// theh heaviest bank
|
||||||
|
heaviest_bank
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
&None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(bank) = selected_fork {
|
||||||
|
let (is_locked_out, vote_threshold, fork_weight) = {
|
||||||
|
let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats;
|
||||||
|
(
|
||||||
|
fork_stats.is_locked_out,
|
||||||
|
fork_stats.vote_threshold,
|
||||||
|
fork_stats.weight,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if is_locked_out {
|
||||||
|
failure_reasons.push(HeaviestForkFailures::LockedOut(bank.slot()));
|
||||||
|
}
|
||||||
|
if !vote_threshold {
|
||||||
|
failure_reasons.push(HeaviestForkFailures::FailedThreshold(bank.slot()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !is_locked_out && vote_threshold {
|
||||||
|
info!("voting: {} {}", bank.slot(), fork_weight);
|
||||||
|
(
|
||||||
|
selected_fork.clone(),
|
||||||
|
selected_fork.clone(),
|
||||||
|
failure_reasons,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
(None, selected_fork.clone(), failure_reasons)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
(None, None, failure_reasons)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn confirm_forks(
|
fn confirm_forks(
|
||||||
tower: &Tower,
|
tower: &Tower,
|
||||||
stake_lockouts: &HashMap<u64, StakeLockout>,
|
stake_lockouts: &HashMap<u64, StakeLockout>,
|
||||||
total_staked: u64,
|
total_staked: u64,
|
||||||
progress: &HashMap<u64, ForkProgress>,
|
progress: &ProgressMap,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
) -> Vec<Slot> {
|
) -> Vec<Slot> {
|
||||||
let mut confirmed_forks = vec![];
|
let mut confirmed_forks = vec![];
|
||||||
|
@ -941,14 +1153,16 @@ impl ReplayStage {
|
||||||
pub(crate) fn handle_new_root(
|
pub(crate) fn handle_new_root(
|
||||||
new_root: u64,
|
new_root: u64,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
progress: &mut HashMap<u64, ForkProgress>,
|
progress: &mut ProgressMap,
|
||||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||||
|
earliest_vote_on_fork: &mut u64,
|
||||||
) {
|
) {
|
||||||
bank_forks
|
bank_forks
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.set_root(new_root, snapshot_package_sender);
|
.set_root(new_root, snapshot_package_sender);
|
||||||
let r_bank_forks = bank_forks.read().unwrap();
|
let r_bank_forks = bank_forks.read().unwrap();
|
||||||
|
*earliest_vote_on_fork = std::cmp::max(new_root, *earliest_vote_on_fork);
|
||||||
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
|
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1303,13 +1517,17 @@ pub(crate) mod tests {
|
||||||
&towers[i],
|
&towers[i],
|
||||||
&mut fork_progresses[i],
|
&mut fork_progresses[i],
|
||||||
);
|
);
|
||||||
let response =
|
let (heaviest_bank, _) = ReplayStage::select_forks(
|
||||||
ReplayStage::select_fork(&frozen_banks, &towers[i], &fork_progresses[i]);
|
&frozen_banks,
|
||||||
|
&towers[i],
|
||||||
|
&mut fork_progresses[i],
|
||||||
|
&bank_fork_ancestors,
|
||||||
|
);
|
||||||
|
|
||||||
if response.is_none() {
|
if heaviest_bank.is_none() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
let bank = response.unwrap();
|
let bank = heaviest_bank.unwrap();
|
||||||
let stats = &fork_progresses[i].get(&bank.slot()).unwrap().fork_stats;
|
let stats = &fork_progresses[i].get(&bank.slot()).unwrap().fork_stats;
|
||||||
Some(ForkSelectionResponse {
|
Some(ForkSelectionResponse {
|
||||||
slot: stats.slot,
|
slot: stats.slot,
|
||||||
|
@ -1445,10 +1663,28 @@ pub(crate) mod tests {
|
||||||
for i in 0..=root {
|
for i in 0..=root {
|
||||||
progress.insert(i, ForkProgress::new(Hash::default()));
|
progress.insert(i, ForkProgress::new(Hash::default()));
|
||||||
}
|
}
|
||||||
ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None);
|
let mut earliest_vote_on_fork = root - 1;
|
||||||
|
ReplayStage::handle_new_root(
|
||||||
|
root,
|
||||||
|
&bank_forks,
|
||||||
|
&mut progress,
|
||||||
|
&None,
|
||||||
|
&mut earliest_vote_on_fork,
|
||||||
|
);
|
||||||
assert_eq!(bank_forks.read().unwrap().root(), root);
|
assert_eq!(bank_forks.read().unwrap().root(), root);
|
||||||
assert_eq!(progress.len(), 1);
|
assert_eq!(progress.len(), 1);
|
||||||
|
assert_eq!(earliest_vote_on_fork, root);
|
||||||
assert!(progress.get(&root).is_some());
|
assert!(progress.get(&root).is_some());
|
||||||
|
|
||||||
|
earliest_vote_on_fork = root + 1;
|
||||||
|
ReplayStage::handle_new_root(
|
||||||
|
root,
|
||||||
|
&bank_forks,
|
||||||
|
&mut progress,
|
||||||
|
&None,
|
||||||
|
&mut earliest_vote_on_fork,
|
||||||
|
);
|
||||||
|
assert_eq!(earliest_vote_on_fork, root + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue