diff --git a/core/src/consensus.rs b/core/src/consensus.rs index fcc9cc7b94..8acc218ab3 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,3 +1,4 @@ +use crate::replay_stage::ProgressMap; use chrono::prelude::*; use solana_ledger::bank_forks::BankForks; use solana_runtime::bank::Bank; @@ -355,6 +356,17 @@ impl Tower { } } + pub(crate) fn check_switch_threshold( + &self, + _slot: u64, + _ancestors: &HashMap>, + _descendants: &HashMap>, + _progress: &ProgressMap, + _total_stake: u64, + ) -> bool { + true + } + /// Update lockouts for all the ancestors fn update_ancestor_lockouts( stake_lockouts: &mut HashMap, @@ -468,7 +480,7 @@ impl Tower { #[cfg(test)] pub mod test { use super::*; - use crate::replay_stage::{ForkProgress, ReplayStage}; + use crate::replay_stage::{ForkProgress, HeaviestForkFailures, ReplayStage}; use solana_ledger::bank_forks::BankForks; use solana_runtime::{ bank::Bank, @@ -513,7 +525,7 @@ pub mod test { my_keypairs: &ValidatorVoteKeypairs, progress: &mut HashMap, tower: &mut Tower, - ) -> Vec { + ) -> Vec { let node = self .find_node_and_update_simulation(vote_slot) .expect("Vote to simulate must be for a slot in the tree"); @@ -611,17 +623,17 @@ pub mod test { info!("lockouts: {:?}", fork_progress.fork_stats.stake_lockouts); let mut failures = vec![]; if fork_progress.fork_stats.is_locked_out { - failures.push(VoteFailures::LockedOut(vote_slot)); + failures.push(HeaviestForkFailures::LockedOut(vote_slot)); } if !fork_progress.fork_stats.vote_threshold { - failures.push(VoteFailures::FailedThreshold(vote_slot)); + failures.push(HeaviestForkFailures::FailedThreshold(vote_slot)); } if !failures.is_empty() { return failures; } let vote = tower.new_vote_from_bank(&bank, &my_vote_pubkey).0; if let Some(new_root) = tower.record_bank_vote(vote) { - ReplayStage::handle_new_root(new_root, bank_forks, progress, &None); + ReplayStage::handle_new_root(new_root, bank_forks, progress, &None, &mut 0); } // Mark the vote for this bank under this node's pubkey so it will be @@ -671,12 +683,6 @@ pub mod test { } } - #[derive(PartialEq, Debug)] - pub(crate) enum VoteFailures { - LockedOut(u64), - FailedThreshold(u64), - } - // Setup BankForks with bank 0 and all the validator accounts pub(crate) fn initialize_state( validator_keypairs_map: &HashMap, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c21ba857e9..f3767875b5 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -45,6 +45,14 @@ use std::{ }; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; +pub(crate) type ProgressMap = HashMap; + +#[derive(PartialEq, Debug)] +pub(crate) enum HeaviestForkFailures { + LockedOut(u64), + FailedThreshold(u64), + FailedSwitchThreshold(u64), +} // Implement a destructor for the ReplayStage thread to signal it exited // even on panics @@ -207,6 +215,11 @@ impl ReplayStage { let mut current_leader = None; let mut last_reset = Hash::default(); let mut partition = false; + let mut earliest_vote_on_fork = { + let slots = tower.last_vote().slots; + slots.last().cloned().unwrap_or(0) + }; + let mut switch_threshold = false; loop { let allocated = thread_mem_usage::Allocatedp::default(); @@ -242,6 +255,7 @@ impl ReplayStage { Self::report_memory(&allocated, "replay_active_banks", start); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); + let descendants = Arc::new(HashMap::new()); let start = allocated.get(); let mut frozen_banks: Vec<_> = bank_forks .read() @@ -276,95 +290,148 @@ impl ReplayStage { } } - let vote_bank = Self::select_fork(&frozen_banks, &tower, &progress); + let (heaviest_bank, votable_bank_on_same_fork) = + Self::select_forks(&frozen_banks, &tower, &progress, &ancestors); + Self::report_memory(&allocated, "select_fork", start); - if vote_bank.is_none() { - break; + + let (vote_bank, reset_bank, failure_reasons) = + Self::select_vote_and_reset_forks( + &heaviest_bank, + &votable_bank_on_same_fork, + earliest_vote_on_fork, + &mut switch_threshold, + &ancestors, + &descendants, + &progress, + &tower, + ); + + if heaviest_bank.is_some() + && tower.is_recent(heaviest_bank.as_ref().unwrap().slot()) + && !failure_reasons.is_empty() + { + info!( + "Couldn't vote on heaviest fork: {:?}, failure_reasons: {:?}", + heaviest_bank.as_ref().map(|b| b.slot()), + failure_reasons + ); } - let bank = vote_bank.unwrap(); - let (is_locked_out, vote_threshold, fork_weight, total_staked) = { - let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats; - ( - fork_stats.is_locked_out, - fork_stats.vote_threshold, - fork_stats.weight, - fork_stats.total_staked, - ) - }; - let mut vote_bank_slot = None; + let start = allocated.get(); - if !is_locked_out && vote_threshold { - info!("voting: {} {}", bank.slot(), fork_weight); - subscriptions.notify_subscribers(bank.slot(), &bank_forks); - if let Some(votable_leader) = - leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank)) - { - Self::log_leader_change( - &my_pubkey, - bank.slot(), - &mut current_leader, - &votable_leader, - ); + + // Vote on a fork + let voted_on_different_fork = { + if let Some(ref vote_bank) = vote_bank { + subscriptions.notify_subscribers(vote_bank.slot(), &bank_forks); + if let Some(votable_leader) = leader_schedule_cache + .slot_leader_at(vote_bank.slot(), Some(vote_bank)) + { + Self::log_leader_change( + &my_pubkey, + vote_bank.slot(), + &mut current_leader, + &votable_leader, + ); + } + + Self::handle_votable_bank( + &vote_bank, + &bank_forks, + &mut tower, + &mut progress, + &vote_account, + &voting_keypair, + &cluster_info, + &blockstore, + &leader_schedule_cache, + &root_bank_sender, + &lockouts_sender, + &snapshot_package_sender, + &latest_root_senders, + &mut earliest_vote_on_fork, + )?; + + ancestors + .get(&vote_bank.slot()) + .unwrap() + .contains(&earliest_vote_on_fork) + } else { + false } - vote_bank_slot = Some(bank.slot()); - Self::handle_votable_bank( - &bank, - &bank_forks, - &mut tower, - &mut progress, - &vote_account, - &voting_keypair, - &cluster_info, - &blockstore, - &leader_schedule_cache, - &root_bank_sender, - total_staked, - &lockouts_sender, - &snapshot_package_sender, - &latest_root_senders, - )?; - } + }; + Self::report_memory(&allocated, "votable_bank", start); let start = allocated.get(); - if last_reset != bank.last_blockhash() { - Self::reset_poh_recorder( - &my_pubkey, - &blockstore, - &bank, - &poh_recorder, - &leader_schedule_cache, - ); - last_reset = bank.last_blockhash(); - tpu_has_bank = false; - info!( - "vote bank: {:?} reset bank: {}", - vote_bank_slot, - bank.slot() - ); - if !partition && vote_bank_slot != Some(bank.slot()) { - warn!( - "PARTITION DETECTED waiting to join fork: {} last vote: {:?}", - bank.slot(), - tower.last_vote() + + // Reset onto a fork + if let Some(reset_bank) = reset_bank { + let selected_same_fork = ancestors + .get(&reset_bank.slot()) + .unwrap() + .contains(&earliest_vote_on_fork); + if last_reset != reset_bank.last_blockhash() + && (selected_same_fork || switch_threshold) + { + info!( + "vote bank: {:?} reset bank: {:?}", + vote_bank.as_ref().map(|b| b.slot()), + reset_bank.slot(), ); - inc_new_counter_info!("replay_stage-partition_detected", 1); - datapoint_info!( - "replay_stage-partition", - ("slot", bank.slot() as i64, i64) + Self::reset_poh_recorder( + &my_pubkey, + &blockstore, + &reset_bank, + &poh_recorder, + &leader_schedule_cache, ); - partition = true; - } else if partition && vote_bank_slot == Some(bank.slot()) { - warn!( - "PARTITION resolved fork: {} last vote: {:?}", - bank.slot(), - tower.last_vote() - ); - partition = false; - inc_new_counter_info!("replay_stage-partition_resolved", 1); + last_reset = reset_bank.last_blockhash(); + tpu_has_bank = false; + + if !partition + && vote_bank.as_ref().map(|b| b.slot()) != Some(reset_bank.slot()) + { + warn!( + "PARTITION DETECTED waiting to join fork: {} last vote: {:?}", + reset_bank.slot(), + tower.last_vote() + ); + inc_new_counter_info!("replay_stage-partition_detected", 1); + datapoint_info!( + "replay_stage-partition", + ("slot", reset_bank.slot() as i64, i64) + ); + partition = true; + } else if partition + && vote_bank.as_ref().map(|b| b.slot()) == Some(reset_bank.slot()) + { + warn!( + "PARTITION resolved fork: {} last vote: {:?}", + reset_bank.slot(), + tower.last_vote() + ); + partition = false; + inc_new_counter_info!("replay_stage-partition_resolved", 1); + } } + datapoint_debug!( + "replay_stage-memory", + ("reset_bank", (allocated.get() - start) as i64, i64), + ); } Self::report_memory(&allocated, "reset_bank", start); + // If we voted on a different fork, update the earliest vote + // to this slot, clear the switch threshold + if voted_on_different_fork { + earliest_vote_on_fork = vote_bank + .expect("voted_on_different_fork only set if vote_bank.is_some()") + .slot(); + // Clear the thresholds after voting on different + // fork + switch_threshold = false; + } + let start = allocated.get(); if !tpu_has_bank { Self::maybe_start_leader( @@ -579,17 +646,17 @@ impl ReplayStage { bank: &Arc, bank_forks: &Arc>, tower: &mut Tower, - progress: &mut HashMap, + progress: &mut ProgressMap, vote_account: &Pubkey, voting_keypair: &Option>, cluster_info: &Arc>, blockstore: &Arc, leader_schedule_cache: &Arc, root_bank_sender: &Sender>>, - total_staked: u64, lockouts_sender: &Sender, snapshot_package_sender: &Option, latest_root_senders: &[Sender], + earliest_vote_on_fork: &mut Slot, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -615,7 +682,13 @@ impl ReplayStage { blockstore .set_roots(&rooted_slots) .expect("Ledger set roots failed"); - Self::handle_new_root(new_root, &bank_forks, progress, snapshot_package_sender); + Self::handle_new_root( + new_root, + &bank_forks, + progress, + snapshot_package_sender, + earliest_vote_on_fork, + ); latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { trace!("latest root send failed: {:?}", e); @@ -627,7 +700,12 @@ impl ReplayStage { return Err(e.into()); } } - Self::update_commitment_cache(bank.clone(), total_staked, lockouts_sender); + + Self::update_commitment_cache( + bank.clone(), + progress.get(&bank.slot()).unwrap().fork_stats.total_staked, + lockouts_sender, + ); if let Some(ref voting_keypair) = voting_keypair { let node_keypair = cluster_info.read().unwrap().keypair.clone(); @@ -701,7 +779,7 @@ impl ReplayStage { blockstore: &Arc, bank_forks: &Arc>, my_pubkey: &Pubkey, - progress: &mut HashMap, + progress: &mut ProgressMap, slot_full_senders: &[Sender<(u64, Pubkey)>], transaction_status_sender: Option, verify_recyclers: &VerifyRecyclers, @@ -722,7 +800,7 @@ impl ReplayStage { // Insert a progress entry even for slots this node is the leader for, so that // 1) confirm_forks can report confirmation, 2) we can cache computations about - // this bank in `select_fork()` + // this bank in `select_forks()` let bank_progress = &mut progress .entry(bank.slot()) .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); @@ -771,10 +849,10 @@ impl ReplayStage { ancestors: &HashMap>, frozen_banks: &mut Vec>, tower: &Tower, - progress: &mut HashMap, + progress: &mut ProgressMap, ) -> Vec { frozen_banks.sort_by_key(|bank| bank.slot()); - let mut new_stats = vec![]; + let new_stats = vec![]; for bank in frozen_banks { // Only time progress map should be missing a bank slot // is if this node was the leader for this slot as those banks @@ -826,16 +904,19 @@ impl ReplayStage { stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors); stats.has_voted = tower.has_voted(bank.slot()); stats.is_recent = tower.is_recent(bank.slot()); - new_stats.push(stats.slot); } new_stats } - pub(crate) fn select_fork( + // Returns: + // 1) The heaviest bank + // 2) The latest votable bank on the same fork as the last vote + pub(crate) fn select_forks( frozen_banks: &[Arc], tower: &Tower, - progress: &HashMap, - ) -> Option> { + progress: &ProgressMap, + ancestors: &HashMap>, + ) -> (Option>, Option>) { let tower_start = Instant::now(); let num_frozen_banks = frozen_banks.len(); @@ -845,16 +926,37 @@ impl ReplayStage { .filter(|b| b.slot() < tower.root().unwrap_or(0)) .count(); + let last_vote = tower.last_vote().slots.last().cloned(); + let mut last_votable_on_same_fork = None; let stats: Vec<&ForkStats> = frozen_banks .iter() .map(|bank| { // Only time progress map should be missing a bank slot // is if this node was the leader for this slot as those banks // are not replayed in replay_active_banks() - &progress + let stats = &progress .get(&bank.slot()) .expect("All frozen banks must exist in the Progress map") - .fork_stats + .fork_stats; + + if let Some(last_vote) = last_vote { + if ancestors + .get(&bank.slot()) + .expect("Entry in frozen banks must exist in ancestors") + .contains(&last_vote) + && stats.vote_threshold + { + // Descendant of last vote cannot be locked out + assert!(!stats.is_locked_out); + + // ancestors(slot) should not contain the slot itself, + // so we shouldd never get the same bank as the last vote + assert_ne!(bank.slot(), last_vote); + last_votable_on_same_fork = Some(bank.clone()); + } + } + + stats }) .collect(); let num_not_recent = stats.iter().filter(|s| !s.is_recent).count(); @@ -886,7 +988,7 @@ impl ReplayStage { rv.is_some() ); datapoint_debug!( - "replay_stage-select_fork", + "replay_stage-select_forks", ("frozen_banks", num_frozen_banks as i64, i64), ("not_recent", num_not_recent as i64, i64), ("has_voted", num_has_voted as i64, i64), @@ -900,14 +1002,124 @@ impl ReplayStage { ), ("tower_duration", ms as i64, i64), ); - rv.map(|x| x.0.clone()) + + (rv.map(|x| x.0.clone()), last_votable_on_same_fork) + } + + // Given a heaviest bank, `heaviest_bank` and the next votable bank + // `votable_bank_on_same_fork` as the validator's last vote, return + // a bank to vote on, a bank to reset to, + pub(crate) fn select_vote_and_reset_forks( + heaviest_bank: &Option>, + votable_bank_on_same_fork: &Option>, + earliest_vote_on_fork: u64, + switch_threshold: &mut bool, + ancestors: &HashMap>, + descendants: &HashMap>, + progress: &ProgressMap, + tower: &Tower, + ) -> ( + Option>, + Option>, + Vec, + ) { + // Try to vote on the actual heaviest fork. If the heaviest bank is + // locked out or fails the threshold check, the validator will: + // 1) Not continue to vote on current fork, waiting for lockouts to expire/ + // threshold check to pass + // 2) Will reset PoH to heaviest fork in order to make sure the heaviest + // fork is propagated + // This above behavior should ensure correct voting and resetting PoH + // behavior under all cases: + // 1) The best "selected" bank is on same fork + // 2) The best "selected" bank is on a different fork, + // switch_threshold fails + // 3) The best "selected" bank is on a different fork, + // switch_threshold succceeds + let mut failure_reasons = vec![]; + let selected_fork = { + if let Some(bank) = heaviest_bank { + let selected_same_fork = ancestors + .get(&bank.slot()) + .unwrap() + .contains(&earliest_vote_on_fork); + if selected_same_fork { + // If the heaviest bank is on the same fork as the last + // vote, then there's no need to check the switch threshold. + // Just vote for the latest votable bank on the same fork, + // which is `votable_bank_on_same_fork`. + votable_bank_on_same_fork + } else { + if !*switch_threshold { + let total_staked = + progress.get(&bank.slot()).unwrap().fork_stats.total_staked; + *switch_threshold = tower.check_switch_threshold( + earliest_vote_on_fork, + &ancestors, + &descendants, + &progress, + total_staked, + ); + } + if !*switch_threshold { + // If we can't switch, then vote on the the next votable + // bank on the same fork as our last vote + info!( + "Waiting to switch to {}, voting on {:?} on same fork for now", + bank.slot(), + votable_bank_on_same_fork.as_ref().map(|b| b.slot()) + ); + failure_reasons + .push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot())); + votable_bank_on_same_fork + } else { + // If the switch threshold is observed, halt voting on + // the current fork and attempt to vote/reset Poh/switch to + // theh heaviest bank + heaviest_bank + } + } + } else { + &None + } + }; + + if let Some(bank) = selected_fork { + let (is_locked_out, vote_threshold, fork_weight) = { + let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats; + ( + fork_stats.is_locked_out, + fork_stats.vote_threshold, + fork_stats.weight, + ) + }; + if is_locked_out { + failure_reasons.push(HeaviestForkFailures::LockedOut(bank.slot())); + } + if !vote_threshold { + failure_reasons.push(HeaviestForkFailures::FailedThreshold(bank.slot())); + } + + if !is_locked_out && vote_threshold { + info!("voting: {} {}", bank.slot(), fork_weight); + ( + selected_fork.clone(), + selected_fork.clone(), + failure_reasons, + ) + } else { + (None, selected_fork.clone(), failure_reasons) + } + } else { + (None, None, failure_reasons) + } } fn confirm_forks( tower: &Tower, stake_lockouts: &HashMap, total_staked: u64, - progress: &HashMap, + progress: &ProgressMap, bank_forks: &RwLock, ) -> Vec { let mut confirmed_forks = vec![]; @@ -941,14 +1153,16 @@ impl ReplayStage { pub(crate) fn handle_new_root( new_root: u64, bank_forks: &RwLock, - progress: &mut HashMap, + progress: &mut ProgressMap, snapshot_package_sender: &Option, + earliest_vote_on_fork: &mut u64, ) { bank_forks .write() .unwrap() .set_root(new_root, snapshot_package_sender); let r_bank_forks = bank_forks.read().unwrap(); + *earliest_vote_on_fork = std::cmp::max(new_root, *earliest_vote_on_fork); progress.retain(|k, _| r_bank_forks.get(*k).is_some()); } @@ -1303,13 +1517,17 @@ pub(crate) mod tests { &towers[i], &mut fork_progresses[i], ); - let response = - ReplayStage::select_fork(&frozen_banks, &towers[i], &fork_progresses[i]); + let (heaviest_bank, _) = ReplayStage::select_forks( + &frozen_banks, + &towers[i], + &mut fork_progresses[i], + &bank_fork_ancestors, + ); - if response.is_none() { + if heaviest_bank.is_none() { None } else { - let bank = response.unwrap(); + let bank = heaviest_bank.unwrap(); let stats = &fork_progresses[i].get(&bank.slot()).unwrap().fork_stats; Some(ForkSelectionResponse { slot: stats.slot, @@ -1445,10 +1663,28 @@ pub(crate) mod tests { for i in 0..=root { progress.insert(i, ForkProgress::new(Hash::default())); } - ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None); + let mut earliest_vote_on_fork = root - 1; + ReplayStage::handle_new_root( + root, + &bank_forks, + &mut progress, + &None, + &mut earliest_vote_on_fork, + ); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); + assert_eq!(earliest_vote_on_fork, root); assert!(progress.get(&root).is_some()); + + earliest_vote_on_fork = root + 1; + ReplayStage::handle_new_root( + root, + &bank_forks, + &mut progress, + &None, + &mut earliest_vote_on_fork, + ); + assert_eq!(earliest_vote_on_fork, root + 1); } #[test]