diff --git a/core/src/bank_weight_fork_choice.rs b/core/src/bank_weight_fork_choice.rs new file mode 100644 index 0000000000..9b43c14d6d --- /dev/null +++ b/core/src/bank_weight_fork_choice.rs @@ -0,0 +1,154 @@ +use crate::{ + consensus::{ComputedBankState, Tower}, + fork_choice::ForkChoice, + progress_map::{ForkStats, ProgressMap}, +}; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; +use solana_sdk::{clock::Slot, timing}; +use std::time::Instant; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, +}; + +#[derive(Default)] +pub struct BankWeightForkChoice {} + +impl ForkChoice for BankWeightForkChoice { + fn compute_bank_stats( + &mut self, + bank: &Bank, + _tower: &Tower, + progress: &mut ProgressMap, + computed_bank_state: &ComputedBankState, + ) { + let bank_slot = bank.slot(); + // Only time progress map should be missing a bank slot + // is if this node was the leader for this slot as those banks + // are not replayed in replay_active_banks() + let parent_weight = bank + .parent() + .and_then(|b| progress.get(&b.slot())) + .map(|x| x.fork_stats.fork_weight) + .unwrap_or(0); + + let stats = progress + .get_fork_stats_mut(bank_slot) + .expect("All frozen banks must exist in the Progress map"); + + let ComputedBankState { bank_weight, .. } = computed_bank_state; + stats.weight = *bank_weight; + stats.fork_weight = stats.weight + parent_weight; + } + + // Returns: + // 1) The heaviest overall bank + // 2) The heaviest bank on the same fork as the last vote (doesn't require a + // switching proof to vote for) + fn select_forks( + &self, + frozen_banks: &[Arc], + tower: &Tower, + progress: &ProgressMap, + ancestors: &HashMap>, + _bank_forks: &RwLock, + ) -> (Arc, Option>) { + let tower_start = Instant::now(); + assert!(!frozen_banks.is_empty()); + let num_frozen_banks = frozen_banks.len(); + + trace!("frozen_banks {}", frozen_banks.len()); + let num_old_banks = frozen_banks + .iter() + .filter(|b| b.slot() < tower.root()) + .count(); + + let last_voted_slot = tower.last_voted_slot(); + let mut heaviest_bank_on_same_fork = None; + let mut heaviest_same_fork_weight = 0; + let stats: Vec<&ForkStats> = frozen_banks + .iter() + .map(|bank| { + // Only time progress map should be missing a bank slot + // is if this node was the leader for this slot as those banks + // are not replayed in replay_active_banks() + let stats = progress + .get_fork_stats(bank.slot()) + .expect("All frozen banks must exist in the Progress map"); + + if let Some(last_voted_slot) = last_voted_slot { + if ancestors + .get(&bank.slot()) + .expect("Entry in frozen banks must exist in ancestors") + .contains(&last_voted_slot) + { + // Descendant of last vote cannot be locked out + assert!(!stats.is_locked_out); + + // ancestors(slot) should not contain the slot itself, + // so we should never get the same bank as the last vote + assert_ne!(bank.slot(), last_voted_slot); + // highest weight, lowest slot first. frozen_banks is sorted + // from least slot to greatest slot, so if two banks have + // the same fork weight, the lower slot will be picked + if stats.fork_weight > heaviest_same_fork_weight { + heaviest_bank_on_same_fork = Some(bank.clone()); + heaviest_same_fork_weight = stats.fork_weight; + } + } + } + + stats + }) + .collect(); + let num_not_recent = stats.iter().filter(|s| !s.is_recent).count(); + let num_has_voted = stats.iter().filter(|s| s.has_voted).count(); + let num_empty = stats.iter().filter(|s| s.is_empty).count(); + let num_threshold_failure = stats.iter().filter(|s| !s.vote_threshold).count(); + let num_votable_threshold_failure = stats + .iter() + .filter(|s| s.is_recent && !s.has_voted && !s.vote_threshold) + .count(); + + let mut candidates: Vec<_> = frozen_banks.iter().zip(stats.iter()).collect(); + + //highest weight, lowest slot first + candidates.sort_by_key(|b| (b.1.fork_weight, 0i64 - b.0.slot() as i64)); + let rv = candidates + .last() + .expect("frozen banks was nonempty so candidates must also be nonempty"); + let ms = timing::duration_as_ms(&tower_start.elapsed()); + let weights: Vec<(u128, u64, u64)> = candidates + .iter() + .map(|x| (x.1.weight, x.0.slot(), x.1.block_height)) + .collect(); + debug!( + "@{:?} tower duration: {:?} len: {}/{} weights: {:?}", + timing::timestamp(), + ms, + candidates.len(), + stats.iter().filter(|s| !s.has_voted).count(), + weights, + ); + datapoint_debug!( + "replay_stage-select_forks", + ("frozen_banks", num_frozen_banks as i64, i64), + ("not_recent", num_not_recent as i64, i64), + ("has_voted", num_has_voted as i64, i64), + ("old_banks", num_old_banks as i64, i64), + ("empty_banks", num_empty as i64, i64), + ("threshold_failure", num_threshold_failure as i64, i64), + ( + "votable_threshold_failure", + num_votable_threshold_failure as i64, + i64 + ), + ("tower_duration", ms as i64, i64), + ); + + (rv.0.clone(), heaviest_bank_on_same_fork) + } + + fn mark_fork_invalid_candidate(&mut self, _invalid_slot: Slot) {} + fn mark_fork_valid_candidate(&mut self, _valid_slots: &[Slot]) {} +} diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index d3a143f7ae..4e38ae532f 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -4,6 +4,7 @@ use crate::{ optimistic_confirmation_verifier::OptimisticConfirmationVerifier, optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, poh_recorder::PohRecorder, + replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, rpc_subscriptions::RpcSubscriptions, sigverify, @@ -21,6 +22,7 @@ use solana_perf::packet::{self, Packets}; use solana_runtime::{ bank::Bank, bank_forks::BankForks, + commitment::VOTE_THRESHOLD_SIZE, epoch_stakes::{EpochAuthorizedVoters, EpochStakes}, stakes::Stakes, vote_sender_types::{ReplayVoteReceiver, ReplayedVote}, @@ -44,12 +46,17 @@ use std::{ }; // Map from a vote account to the authorized voter for an epoch +pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>; pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; pub type VerifiedVoteTransactionsSender = CrossbeamSender>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec)>; pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec)>; +pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender; +pub type GossipDuplicateConfirmedSlotsReceiver = CrossbeamReceiver; + +const THRESHOLDS_TO_CHECK: [f64; 2] = [DUPLICATE_THRESHOLD, VOTE_THRESHOLD_SIZE]; #[derive(Default)] pub struct SlotVoteTracker { @@ -245,6 +252,7 @@ impl ClusterInfoVoteListener { replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, bank_notification_sender: Option, + cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, ) -> Self { let exit_ = exit.clone(); @@ -291,6 +299,7 @@ impl ClusterInfoVoteListener { replay_votes_receiver, blockstore, bank_notification_sender, + cluster_confirmed_slot_sender, ); }) .unwrap(); @@ -406,6 +415,7 @@ impl ClusterInfoVoteListener { } } + #[allow(clippy::too_many_arguments)] fn process_votes_loop( exit: Arc, gossip_vote_txs_receiver: VerifiedVoteTransactionsReceiver, @@ -416,10 +426,12 @@ impl ClusterInfoVoteListener { replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, bank_notification_sender: Option, + cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, ) -> Result<()> { let mut confirmation_verifier = OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root()); let mut last_process_root = Instant::now(); + let cluster_confirmed_slot_sender = Some(cluster_confirmed_slot_sender); loop { if exit.load(Ordering::Relaxed) { return Ok(()); @@ -448,10 +460,12 @@ impl ClusterInfoVoteListener { &verified_vote_sender, &replay_votes_receiver, &bank_notification_sender, + &cluster_confirmed_slot_sender, ); match confirmed_slots { Ok(confirmed_slots) => { - confirmation_verifier.add_new_optimistic_confirmed_slots(confirmed_slots); + confirmation_verifier + .add_new_optimistic_confirmed_slots(confirmed_slots.clone()); } Err(e) => match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) @@ -472,7 +486,7 @@ impl ClusterInfoVoteListener { subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVoteReceiver, - ) -> Result> { + ) -> Result { Self::listen_and_confirm_votes( gossip_vote_txs_receiver, vote_tracker, @@ -481,6 +495,7 @@ impl ClusterInfoVoteListener { verified_vote_sender, replay_votes_receiver, &None, + &None, ) } @@ -492,7 +507,8 @@ impl ClusterInfoVoteListener { verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVoteReceiver, bank_notification_sender: &Option, - ) -> Result> { + cluster_confirmed_slot_sender: &Option, + ) -> Result { let mut sel = Select::new(); sel.recv(gossip_vote_txs_receiver); sel.recv(replay_votes_receiver); @@ -521,6 +537,7 @@ impl ClusterInfoVoteListener { subscriptions, verified_vote_sender, bank_notification_sender, + cluster_confirmed_slot_sender, )); } else { remaining_wait_time = remaining_wait_time @@ -539,9 +556,10 @@ impl ClusterInfoVoteListener { subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, diff: &mut HashMap>, - new_optimistic_confirmed_slots: &mut Vec<(Slot, Hash)>, + new_optimistic_confirmed_slots: &mut ThresholdConfirmedSlots, is_gossip_vote: bool, bank_notification_sender: &Option, + cluster_confirmed_slot_sender: &Option, ) { if vote.slots.is_empty() { return; @@ -577,7 +595,7 @@ impl ClusterInfoVoteListener { // Fast track processing of the last slot in a vote transactions // so that notifications for optimistic confirmation can be sent // as soon as possible. - let (is_confirmed, is_new) = Self::track_optimistic_confirmation_vote( + let (reached_threshold_results, is_new) = Self::track_optimistic_confirmation_vote( vote_tracker, last_vote_slot, last_vote_hash, @@ -586,7 +604,12 @@ impl ClusterInfoVoteListener { total_stake, ); - if is_confirmed { + if reached_threshold_results[0] { + if let Some(sender) = cluster_confirmed_slot_sender { + let _ = sender.send(vec![(last_vote_slot, last_vote_hash)]); + } + } + if reached_threshold_results[1] { new_optimistic_confirmed_slots.push((last_vote_slot, last_vote_hash)); // Notify subscribers about new optimistic confirmation if let Some(sender) = bank_notification_sender { @@ -670,7 +693,8 @@ impl ClusterInfoVoteListener { subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, bank_notification_sender: &Option, - ) -> Vec<(Slot, Hash)> { + cluster_confirmed_slot_sender: &Option, + ) -> ThresholdConfirmedSlots { let mut diff: HashMap> = HashMap::new(); let mut new_optimistic_confirmed_slots = vec![]; @@ -697,6 +721,7 @@ impl ClusterInfoVoteListener { &mut new_optimistic_confirmed_slots, is_gossip, bank_notification_sender, + cluster_confirmed_slot_sender, ); } @@ -756,14 +781,14 @@ impl ClusterInfoVoteListener { pubkey: Pubkey, stake: u64, total_epoch_stake: u64, - ) -> (bool, bool) { + ) -> (Vec, bool) { let slot_tracker = vote_tracker.get_or_insert_slot_tracker(slot); // Insert vote and check for optimistic confirmation let mut w_slot_tracker = slot_tracker.write().unwrap(); w_slot_tracker .get_or_insert_optimistic_votes_tracker(hash) - .add_vote_pubkey(pubkey, stake, total_epoch_stake) + .add_vote_pubkey(pubkey, stake, total_epoch_stake, &THRESHOLDS_TO_CHECK) } fn sum_stake(sum: &mut u64, epoch_stakes: Option<&EpochStakes>, pubkey: &Pubkey) { @@ -1005,6 +1030,7 @@ mod tests { &verified_vote_sender, &replay_votes_receiver, &None, + &None, ) .unwrap(); @@ -1034,6 +1060,7 @@ mod tests { &verified_vote_sender, &replay_votes_receiver, &None, + &None, ) .unwrap(); @@ -1112,6 +1139,7 @@ mod tests { &verified_vote_sender, &replay_votes_receiver, &None, + &None, ) .unwrap(); @@ -1231,6 +1259,7 @@ mod tests { &verified_vote_sender, &replay_votes_receiver, &None, + &None, ) .unwrap(); @@ -1326,6 +1355,7 @@ mod tests { &verified_vote_sender, &replay_votes_receiver, &None, + &None, ); } let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); @@ -1470,6 +1500,7 @@ mod tests { &subscriptions, &verified_vote_sender, &None, + &None, ); // Setup next epoch @@ -1524,6 +1555,7 @@ mod tests { &subscriptions, &verified_vote_sender, &None, + &None, ); } diff --git a/core/src/cluster_slot_state_verifier.rs b/core/src/cluster_slot_state_verifier.rs new file mode 100644 index 0000000000..0edff63f80 --- /dev/null +++ b/core/src/cluster_slot_state_verifier.rs @@ -0,0 +1,751 @@ +use crate::{ + fork_choice::ForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + progress_map::ProgressMap, +}; +use solana_sdk::{clock::Slot, hash::Hash}; +use std::collections::{BTreeMap, HashMap, HashSet}; + +pub type GossipDuplicateConfirmedSlots = BTreeMap; +type SlotStateHandler = fn(Slot, &Hash, Option<&Hash>, bool, bool) -> Vec; + +#[derive(PartialEq, Debug)] +pub enum SlotStateUpdate { + Frozen, + DuplicateConfirmed, + Dead, + Duplicate, +} + +#[derive(PartialEq, Debug)] +pub enum ResultingStateChange { + MarkSlotDuplicate, + RepairDuplicateConfirmedVersion(Hash), + DuplicateConfirmedSlotMatchesCluster, +} + +impl SlotStateUpdate { + fn to_handler(&self) -> SlotStateHandler { + match self { + SlotStateUpdate::Dead => on_dead_slot, + SlotStateUpdate::Frozen => on_frozen_slot, + SlotStateUpdate::DuplicateConfirmed => on_cluster_update, + SlotStateUpdate::Duplicate => on_cluster_update, + } + } +} + +fn repair_correct_version(_slot: Slot, _hash: &Hash) {} + +fn on_dead_slot( + slot: Slot, + bank_frozen_hash: &Hash, + cluster_duplicate_confirmed_hash: Option<&Hash>, + _is_slot_duplicate: bool, + is_dead: bool, +) -> Vec { + assert!(is_dead); + // Bank should not have been frozen if the slot was marked dead + assert_eq!(*bank_frozen_hash, Hash::default()); + if let Some(cluster_duplicate_confirmed_hash) = cluster_duplicate_confirmed_hash { + // If the cluster duplicate_confirmed some version of this slot, then + // there's another version + warn!( + "Cluster duplicate_confirmed slot {} with hash {}, but we marked slot dead", + slot, cluster_duplicate_confirmed_hash + ); + // No need to check `is_slot_duplicate` and modify fork choice as dead slots + // are never frozen, and thus never added to fork choice. The state change for + // `MarkSlotDuplicate` will try to modify fork choice, but won't find the slot + // in the fork choice tree, so is equivalent to a + return vec![ + ResultingStateChange::MarkSlotDuplicate, + ResultingStateChange::RepairDuplicateConfirmedVersion( + *cluster_duplicate_confirmed_hash, + ), + ]; + } + + vec![] +} + +fn on_frozen_slot( + slot: Slot, + bank_frozen_hash: &Hash, + cluster_duplicate_confirmed_hash: Option<&Hash>, + is_slot_duplicate: bool, + is_dead: bool, +) -> Vec { + // If a slot is marked frozen, the bank hash should not be default, + // and the slot should not be dead + assert!(*bank_frozen_hash != Hash::default()); + assert!(!is_dead); + + if let Some(cluster_duplicate_confirmed_hash) = cluster_duplicate_confirmed_hash { + // If the cluster duplicate_confirmed some version of this slot, then + // confirm our version agrees with the cluster, + if cluster_duplicate_confirmed_hash != bank_frozen_hash { + // If the versions do not match, modify fork choice rule + // to exclude our version from being voted on and also + // repair correct version + warn!( + "Cluster duplicate_confirmed slot {} with hash {}, but we froze slot with hash {}", + slot, cluster_duplicate_confirmed_hash, bank_frozen_hash + ); + return vec![ + ResultingStateChange::MarkSlotDuplicate, + ResultingStateChange::RepairDuplicateConfirmedVersion( + *cluster_duplicate_confirmed_hash, + ), + ]; + } else { + // If the versions match, then add the slot to the candidate + // set to account for the case where it was removed earlier + // by the `on_duplicate_slot()` handler + return vec![ResultingStateChange::DuplicateConfirmedSlotMatchesCluster]; + } + } + + if is_slot_duplicate { + // If we detected a duplicate, but have not yet seen any version + // of the slot duplicate_confirmed (i.e. block above did not execute), then + // remove the slot from fork choice until we get confirmation. + + // If we get here, we either detected duplicate from + // 1) WindowService + // 2) A gossip duplicate_confirmed version that didn't match our frozen + // version. + // In both cases, mark the progress map for this slot as duplicate + return vec![ResultingStateChange::MarkSlotDuplicate]; + } + + vec![] +} + +// Called when we receive either: +// 1) A duplicate slot signal from WindowStage, +// 2) Confirmation of a slot by observing votes from replay or gossip. +// +// This signals external information about this slot, which affects +// this validator's understanding of the validity of this slot +fn on_cluster_update( + slot: Slot, + bank_frozen_hash: &Hash, + cluster_duplicate_confirmed_hash: Option<&Hash>, + is_slot_duplicate: bool, + is_dead: bool, +) -> Vec { + if is_dead { + on_dead_slot( + slot, + bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead, + ) + } else if *bank_frozen_hash != Hash::default() { + // This case is mutually exclusive with is_dead case above because if a slot is dead, + // it cannot have been frozen, and thus cannot have a non-default bank hash. + on_frozen_slot( + slot, + bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead, + ) + } else { + vec![] + } +} + +fn get_cluster_duplicate_confirmed_hash<'a>( + slot: Slot, + gossip_duplicate_confirmed_hash: Option<&'a Hash>, + local_frozen_hash: &'a Hash, + is_local_replay_duplicate_confirmed: bool, +) -> Option<&'a Hash> { + let local_duplicate_confirmed_hash = if is_local_replay_duplicate_confirmed { + // If local replay has duplicate_confirmed this slot, this slot must have + // descendants with votes for this slot, hence this slot must be + // frozen. + assert!(*local_frozen_hash != Hash::default()); + Some(local_frozen_hash) + } else { + None + }; + + match ( + local_duplicate_confirmed_hash, + gossip_duplicate_confirmed_hash, + ) { + (Some(local_duplicate_confirmed_hash), Some(gossip_duplicate_confirmed_hash)) => { + if local_duplicate_confirmed_hash != gossip_duplicate_confirmed_hash { + error!( + "For slot {}, the gossip duplicate confirmed hash {}, is not equal + to the confirmed hash we replayed: {}", + slot, gossip_duplicate_confirmed_hash, local_duplicate_confirmed_hash + ); + } + Some(&local_frozen_hash) + } + (Some(local_frozen_hash), None) => Some(local_frozen_hash), + _ => gossip_duplicate_confirmed_hash, + } +} + +fn apply_state_changes( + slot: Slot, + progress: &mut ProgressMap, + fork_choice: &mut HeaviestSubtreeForkChoice, + ancestors: &HashMap>, + descendants: &HashMap>, + state_changes: Vec, +) { + for state_change in state_changes { + match state_change { + ResultingStateChange::MarkSlotDuplicate => { + progress.set_unconfirmed_duplicate_slot( + slot, + descendants.get(&slot).unwrap_or(&HashSet::default()), + ); + fork_choice.mark_fork_invalid_candidate(slot); + } + ResultingStateChange::RepairDuplicateConfirmedVersion( + cluster_duplicate_confirmed_hash, + ) => { + // TODO: Should consider moving the updating of the duplicate slots in the + // progress map from ReplayStage::confirm_forks to here. + repair_correct_version(slot, &cluster_duplicate_confirmed_hash); + } + ResultingStateChange::DuplicateConfirmedSlotMatchesCluster => { + progress.set_confirmed_duplicate_slot( + slot, + ancestors.get(&slot).unwrap_or(&HashSet::default()), + descendants.get(&slot).unwrap_or(&HashSet::default()), + ); + fork_choice.mark_fork_valid_candidate(slot); + } + } + } +} + +pub(crate) fn check_slot_agrees_with_cluster( + slot: Slot, + root: Slot, + frozen_hash: Option, + gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, + ancestors: &HashMap>, + descendants: &HashMap>, + progress: &mut ProgressMap, + fork_choice: &mut HeaviestSubtreeForkChoice, + slot_state_update: SlotStateUpdate, +) { + if slot <= root { + return; + } + + if frozen_hash.is_none() { + // If the bank doesn't even exist in BankForks yet, + // then there's nothing to do as replay of the slot + // hasn't even started + return; + } + + let frozen_hash = frozen_hash.unwrap(); + let gossip_duplicate_confirmed_hash = gossip_duplicate_confirmed_slots.get(&slot); + let is_local_replay_duplicate_confirmed = progress.is_duplicate_confirmed(slot).expect("If the frozen hash exists, then the slot must exist in bank forks and thus in progress map"); + let cluster_duplicate_confirmed_hash = get_cluster_duplicate_confirmed_hash( + slot, + gossip_duplicate_confirmed_hash, + &frozen_hash, + is_local_replay_duplicate_confirmed, + ); + let mut is_slot_duplicate = + progress.is_unconfirmed_duplicate(slot).expect("If the frozen hash exists, then the slot must exist in bank forks and thus in progress map"); + if matches!(slot_state_update, SlotStateUpdate::Duplicate) { + if is_slot_duplicate { + // Already processed duplicate signal for this slot, no need to continue + return; + } else { + // Otherwise, mark the slot as duplicate so the appropriate state changes + // will trigger + is_slot_duplicate = true; + } + } + let is_dead = progress.is_dead(slot).expect("If the frozen hash exists, then the slot must exist in bank forks and thus in progress map"); + + let state_handler = slot_state_update.to_handler(); + let state_changes = state_handler( + slot, + &frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead, + ); + apply_state_changes( + slot, + progress, + fork_choice, + ancestors, + descendants, + state_changes, + ); +} + +#[cfg(test)] +mod test { + use super::*; + use crate::consensus::test::VoteSimulator; + use solana_runtime::bank_forks::BankForks; + use std::sync::RwLock; + use trees::tr; + + struct InitialState { + heaviest_subtree_fork_choice: HeaviestSubtreeForkChoice, + progress: ProgressMap, + ancestors: HashMap>, + descendants: HashMap>, + slot: Slot, + bank_forks: RwLock, + } + + fn setup() -> InitialState { + // Create simple fork 0 -> 1 -> 2 -> 3 + let forks = tr(0) / (tr(1) / (tr(2) / tr(3))); + let mut vote_simulator = VoteSimulator::new(1); + vote_simulator.fill_bank_forks(forks, &HashMap::new()); + let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + + let descendants = vote_simulator + .bank_forks + .read() + .unwrap() + .descendants() + .clone(); + + InitialState { + heaviest_subtree_fork_choice: vote_simulator.heaviest_subtree_fork_choice, + progress: vote_simulator.progress, + ancestors, + descendants, + slot: 0, + bank_forks: vote_simulator.bank_forks, + } + } + + #[test] + fn test_frozen_duplicate() { + // Common state + let slot = 0; + let cluster_duplicate_confirmed_hash = None; + let is_dead = false; + + // Slot is not detected as duplicate yet + let mut is_slot_duplicate = false; + + // Simulate freezing the bank, add a + // new non-default hash, should return + // no actionable state changes yet + let bank_frozen_hash = Hash::new_unique(); + assert!(on_frozen_slot( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ) + .is_empty()); + + // Now mark the slot as duplicate, should + // trigger marking the slot as a duplicate + is_slot_duplicate = true; + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ResultingStateChange::MarkSlotDuplicate] + ); + } + + #[test] + fn test_frozen_duplicate_confirmed() { + // Common state + let slot = 0; + let is_slot_duplicate = false; + let is_dead = false; + + // No cluster duplicate_confirmed hash yet + let mut cluster_duplicate_confirmed_hash = None; + + // Simulate freezing the bank, add a + // new non-default hash, should return + // no actionable state changes + let bank_frozen_hash = Hash::new_unique(); + assert!(on_frozen_slot( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ) + .is_empty()); + + // Now mark the same frozen slot hash as duplicate_confirmed by the cluster, + // should just confirm the slot + cluster_duplicate_confirmed_hash = Some(&bank_frozen_hash); + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ResultingStateChange::DuplicateConfirmedSlotMatchesCluster,] + ); + + // If the cluster_duplicate_confirmed_hash does not match, then we + // should trigger marking the slot as a duplicate, and also + // try to repair correct version + let mismatched_hash = Hash::new_unique(); + cluster_duplicate_confirmed_hash = Some(&mismatched_hash); + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ + ResultingStateChange::MarkSlotDuplicate, + ResultingStateChange::RepairDuplicateConfirmedVersion(mismatched_hash), + ] + ); + } + + #[test] + fn test_duplicate_frozen_duplicate_confirmed() { + // Common state + let slot = 0; + let is_dead = false; + let is_slot_duplicate = true; + + // Bank is not frozen yet + let mut cluster_duplicate_confirmed_hash = None; + let mut bank_frozen_hash = Hash::default(); + + // Mark the slot as duplicate. Because our version of the slot is not + // frozen yet, we don't know which version we have, so no action is + // taken. + assert!(on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ) + .is_empty()); + + // Freeze the bank, should now mark the slot as duplicate since we have + // not seen confirmation yet. + bank_frozen_hash = Hash::new_unique(); + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ResultingStateChange::MarkSlotDuplicate,] + ); + + // If the cluster_duplicate_confirmed_hash matches, we just confirm + // the slot + cluster_duplicate_confirmed_hash = Some(&bank_frozen_hash); + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ResultingStateChange::DuplicateConfirmedSlotMatchesCluster,] + ); + + // If the cluster_duplicate_confirmed_hash does not match, then we + // should trigger marking the slot as a duplicate, and also + // try to repair correct version + let mismatched_hash = Hash::new_unique(); + cluster_duplicate_confirmed_hash = Some(&mismatched_hash); + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ + ResultingStateChange::MarkSlotDuplicate, + ResultingStateChange::RepairDuplicateConfirmedVersion(mismatched_hash), + ] + ); + } + + #[test] + fn test_duplicate_duplicate_confirmed() { + let slot = 0; + let correct_hash = Hash::new_unique(); + let cluster_duplicate_confirmed_hash = Some(&correct_hash); + let is_dead = false; + // Bank is not frozen yet + let bank_frozen_hash = Hash::default(); + + // Because our version of the slot is not frozen yet, then even though + // the cluster has duplicate_confirmed a hash, we don't know which version we + // have, so no action is taken. + let is_slot_duplicate = true; + assert!(on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ) + .is_empty()); + } + + #[test] + fn test_duplicate_dead() { + let slot = 0; + let cluster_duplicate_confirmed_hash = None; + let is_dead = true; + // Bank is not frozen yet + let bank_frozen_hash = Hash::default(); + + // Even though our version of the slot is dead, the cluster has not + // duplicate_confirmed a hash, we don't know which version we have, so no action + // is taken. + let is_slot_duplicate = true; + assert!(on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ) + .is_empty()); + } + + #[test] + fn test_duplicate_confirmed_dead_duplicate() { + let slot = 0; + let correct_hash = Hash::new_unique(); + // Cluster has duplicate_confirmed some version of the slot + let cluster_duplicate_confirmed_hash = Some(&correct_hash); + // Our version of the slot is dead + let is_dead = true; + let bank_frozen_hash = Hash::default(); + + // Even if the duplicate signal hasn't come in yet, + // we can deduce the slot is duplicate AND we have, + // the wrong version, so should mark the slot as duplicate, + // and repair the correct version + let mut is_slot_duplicate = false; + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ + ResultingStateChange::MarkSlotDuplicate, + ResultingStateChange::RepairDuplicateConfirmedVersion(correct_hash), + ] + ); + + // If the duplicate signal comes in, nothing should change + is_slot_duplicate = true; + assert_eq!( + on_cluster_update( + slot, + &bank_frozen_hash, + cluster_duplicate_confirmed_hash, + is_slot_duplicate, + is_dead + ), + vec![ + ResultingStateChange::MarkSlotDuplicate, + ResultingStateChange::RepairDuplicateConfirmedVersion(correct_hash), + ] + ); + } + + #[test] + fn test_apply_state_changes() { + // Common state + let InitialState { + mut heaviest_subtree_fork_choice, + mut progress, + ancestors, + descendants, + slot, + .. + } = setup(); + + // MarkSlotDuplicate should mark progress map and remove + // the slot from fork choice + apply_state_changes( + slot, + &mut progress, + &mut heaviest_subtree_fork_choice, + &ancestors, + &descendants, + vec![ResultingStateChange::MarkSlotDuplicate], + ); + assert!(!heaviest_subtree_fork_choice + .is_candidate_slot(slot) + .unwrap()); + for child_slot in descendants + .get(&slot) + .unwrap() + .iter() + .chain(std::iter::once(&slot)) + { + assert_eq!( + progress + .latest_unconfirmed_duplicate_ancestor(*child_slot) + .unwrap(), + slot + ); + } + + // DuplicateConfirmedSlotMatchesCluster should re-enable fork choice + apply_state_changes( + slot, + &mut progress, + &mut heaviest_subtree_fork_choice, + &ancestors, + &descendants, + vec![ResultingStateChange::DuplicateConfirmedSlotMatchesCluster], + ); + for child_slot in descendants + .get(&slot) + .unwrap() + .iter() + .chain(std::iter::once(&slot)) + { + assert!(progress + .latest_unconfirmed_duplicate_ancestor(*child_slot) + .is_none()); + } + assert!(heaviest_subtree_fork_choice + .is_candidate_slot(slot) + .unwrap()); + } + + #[test] + fn test_state_ancestor_confirmed_descendant_duplicate() { + // Common state + let InitialState { + mut heaviest_subtree_fork_choice, + mut progress, + ancestors, + descendants, + bank_forks, + .. + } = setup(); + + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 3); + let root = 0; + let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); + + // Mark slot 2 as duplicate confirmed + let slot2_hash = bank_forks.read().unwrap().get(2).unwrap().hash(); + gossip_duplicate_confirmed_slots.insert(2, slot2_hash); + check_slot_agrees_with_cluster( + 2, + root, + Some(slot2_hash), + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::DuplicateConfirmed, + ); + + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 3); + + // Mark 3 as duplicate, should not remove slot 2 from fork choice + check_slot_agrees_with_cluster( + 3, + root, + Some(bank_forks.read().unwrap().get(3).unwrap().hash()), + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::Duplicate, + ); + + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 2); + } + + #[test] + fn test_state_ancestor_duplicate_descendant_confirmed() { + // Common state + let InitialState { + mut heaviest_subtree_fork_choice, + mut progress, + ancestors, + descendants, + bank_forks, + .. + } = setup(); + + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 3); + let root = 0; + let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); + // Mark 2 as duplicate confirmed + check_slot_agrees_with_cluster( + 2, + root, + Some(bank_forks.read().unwrap().get(2).unwrap().hash()), + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::Duplicate, + ); + + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 1); + + // Mark slot 3 as duplicate confirmed, should mark slot 2 as duplicate confirmed as well + let slot3_hash = bank_forks.read().unwrap().get(3).unwrap().hash(); + gossip_duplicate_confirmed_slots.insert(3, slot3_hash); + check_slot_agrees_with_cluster( + 3, + root, + Some(slot3_hash), + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::DuplicateConfirmed, + ); + + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 3); + } +} diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 0dab72c1b6..c869fffee9 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -37,6 +37,7 @@ pub enum SwitchForkDecision { SwitchProof(Hash), SameFork, FailedSwitchThreshold(u64, u64), + FailedSwitchDuplicateRollback(Slot), } impl SwitchForkDecision { @@ -51,6 +52,7 @@ impl SwitchForkDecision { assert_ne!(*total_stake, 0); None } + SwitchForkDecision::FailedSwitchDuplicateRollback(_) => None, SwitchForkDecision::SameFork => Some(vote_instruction::vote( vote_account_pubkey, authorized_voter_pubkey, @@ -68,7 +70,12 @@ impl SwitchForkDecision { } pub fn can_vote(&self) -> bool { - !matches!(self, SwitchForkDecision::FailedSwitchThreshold(_, _)) + match self { + SwitchForkDecision::FailedSwitchThreshold(_, _) => false, + SwitchForkDecision::FailedSwitchDuplicateRollback(_) => false, + SwitchForkDecision::SameFork => true, + SwitchForkDecision::SwitchProof(_) => true, + } } } @@ -383,6 +390,17 @@ impl Tower { slot } + pub fn record_bank_vote( + &mut self, + bank: &Bank, + vote_account_pubkey: &Pubkey, + ) -> (Option, Vec /*VoteState.tower*/) { + let (vote, tower_slots) = self.new_vote_from_bank(bank, vote_account_pubkey); + + let new_root = self.record_bank_vote_update_lockouts(vote); + (new_root, tower_slots) + } + pub fn new_vote_from_bank( &self, bank: &Bank, @@ -392,7 +410,7 @@ impl Tower { Self::new_vote(&self.lockouts, bank.slot(), bank.hash(), voted_slot) } - pub fn record_bank_vote(&mut self, vote: Vote) -> Option { + pub fn record_bank_vote_update_lockouts(&mut self, vote: Vote) -> Option { let slot = vote.last_voted_slot().unwrap_or(0); trace!("{} record_vote for {}", self.node_pubkey, slot); let old_root = self.root(); @@ -411,7 +429,7 @@ impl Tower { #[cfg(test)] pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option { let vote = Vote::new(vec![slot], hash); - self.record_bank_vote(vote) + self.record_bank_vote_update_lockouts(vote) } pub fn last_voted_slot(&self) -> Option { @@ -575,9 +593,13 @@ impl Tower { SwitchForkDecision::FailedSwitchThreshold(0, total_stake) }; + let rollback_due_to_to_to_duplicate_ancestor = |latest_duplicate_ancestor| { + SwitchForkDecision::FailedSwitchDuplicateRollback(latest_duplicate_ancestor) + }; + let last_vote_ancestors = ancestors.get(&last_voted_slot).unwrap_or_else(|| { - if !self.is_stray_last_vote() { + if self.is_stray_last_vote() { // Unless last vote is stray and stale, ancestors.get(last_voted_slot) must // return Some(_), justifying to panic! here. // Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None, @@ -586,9 +608,9 @@ impl Tower { // In other words, except being stray, all other slots have been voted on while // this validator has been running, so we must be able to fetch ancestors for // all of them. - panic!("no ancestors found with slot: {}", last_voted_slot); - } else { empty_ancestors_due_to_minor_unsynced_ledger() + } else { + panic!("no ancestors found with slot: {}", last_voted_slot); } }); @@ -601,15 +623,23 @@ impl Tower { } if last_vote_ancestors.contains(&switch_slot) { - if !self.is_stray_last_vote() { - panic!( - "Should never consider switching to slot ({}), which is ancestors({:?}) of last vote: {}", - switch_slot, - last_vote_ancestors, - last_voted_slot - ); - } else { + if self.is_stray_last_vote() { return suspended_decision_due_to_major_unsynced_ledger(); + } else if let Some(latest_duplicate_ancestor) = progress.latest_unconfirmed_duplicate_ancestor(last_voted_slot) { + // We're rolling back because one of the ancestors of the last vote was a duplicate. In this + // case, it's acceptable if the switch candidate is one of ancestors of the previous vote, + // just fail the switch check because there's no point in voting on an ancestor. ReplayStage + // should then have a special case continue building an alternate fork from this ancestor, NOT + // the `last_voted_slot`. This is in contrast to usual SwitchFailure where ReplayStage continues to build blocks + // on latest vote. See `select_vote_and_reset_forks()` for more details. + return rollback_due_to_to_to_duplicate_ancestor(latest_duplicate_ancestor); + } else { + panic!( + "Should never consider switching to ancestor ({}) of last vote: {}, ancestors({:?})", + switch_slot, + last_voted_slot, + last_vote_ancestors, + ); } } @@ -1240,7 +1270,7 @@ pub mod test { cluster_slots::ClusterSlots, fork_choice::SelectVoteAndResetForkResult, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, - progress_map::ForkProgress, + progress_map::{DuplicateStats, ForkProgress}, replay_stage::{HeaviestForkFailures, ReplayStage}, }; use solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path}; @@ -1265,7 +1295,7 @@ pub mod test { vote_transaction, }; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, fs::{remove_file, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, sync::RwLock, @@ -1313,9 +1343,9 @@ pub mod test { while let Some(visit) = walk.get() { let slot = visit.node().data; - self.progress - .entry(slot) - .or_insert_with(|| ForkProgress::new(Hash::default(), None, None, 0, 0)); + self.progress.entry(slot).or_insert_with(|| { + ForkProgress::new(Hash::default(), None, DuplicateStats::default(), None, 0, 0) + }); if self.bank_forks.read().unwrap().get(slot).is_some() { walk.forward(); continue; @@ -1395,7 +1425,7 @@ pub mod test { .. } = ReplayStage::select_vote_and_reset_forks( &vote_bank, - &None, + None, &ancestors, &descendants, &self.progress, @@ -1407,8 +1437,9 @@ pub mod test { if !heaviest_fork_failures.is_empty() { return heaviest_fork_failures; } - let vote = tower.new_vote_from_bank(&vote_bank, &my_vote_pubkey).0; - if let Some(new_root) = tower.record_bank_vote(vote) { + + let (new_root, _) = tower.record_bank_vote(&vote_bank, &my_vote_pubkey); + if let Some(new_root) = new_root { self.set_root(new_root); } @@ -1423,6 +1454,7 @@ pub mod test { &AbsRequestSender::default(), None, &mut self.heaviest_subtree_fork_choice, + &mut BTreeMap::new(), ) } @@ -1457,7 +1489,9 @@ pub mod test { ) { self.progress .entry(slot) - .or_insert_with(|| ForkProgress::new(Hash::default(), None, None, 0, 0)) + .or_insert_with(|| { + ForkProgress::new(Hash::default(), None, DuplicateStats::default(), None, 0, 0) + }) .fork_stats .lockout_intervals .entry(lockout_interval.1) @@ -1564,7 +1598,14 @@ pub mod test { let mut progress = ProgressMap::default(); progress.insert( 0, - ForkProgress::new(bank0.last_blockhash(), None, None, 0, 0), + ForkProgress::new( + bank0.last_blockhash(), + None, + DuplicateStats::default(), + None, + 0, + 0, + ), ); let bank_forks = BankForks::new(bank0); let heaviest_subtree_fork_choice = @@ -1604,6 +1645,12 @@ pub mod test { assert!(decision .to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()) .is_none()); + + decision = SwitchForkDecision::FailedSwitchDuplicateRollback(0); + assert!(decision + .to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()) + .is_none()); + decision = SwitchForkDecision::SameFork; assert_eq!( decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()), @@ -1613,6 +1660,7 @@ pub mod test { vote.clone(), )) ); + decision = SwitchForkDecision::SwitchProof(Hash::default()); assert_eq!( decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()), @@ -1655,11 +1703,20 @@ pub mod test { } #[test] - fn test_switch_threshold() { + fn test_switch_threshold_duplicate_rollback() { + run_test_switch_threshold_duplicate_rollback(false); + } + + #[test] + #[should_panic] + fn test_switch_threshold_duplicate_rollback_panic() { + run_test_switch_threshold_duplicate_rollback(true); + } + + fn setup_switch_test(num_accounts: usize) -> (Arc, VoteSimulator, u64) { // Init state - let mut vote_simulator = VoteSimulator::new(2); - let my_pubkey = vote_simulator.node_pubkeys[0]; - let other_vote_account = vote_simulator.vote_pubkeys[1]; + assert!(num_accounts > 1); + let mut vote_simulator = VoteSimulator::new(num_accounts); let bank0 = vote_simulator .bank_forks .read() @@ -1690,6 +1747,82 @@ pub mod test { for (_, fork_progress) in vote_simulator.progress.iter_mut() { fork_progress.fork_stats.computed = true; } + + (bank0, vote_simulator, total_stake) + } + + fn run_test_switch_threshold_duplicate_rollback(should_panic: bool) { + let (bank0, mut vote_simulator, total_stake) = setup_switch_test(2); + let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + let descendants = vote_simulator + .bank_forks + .read() + .unwrap() + .descendants() + .clone(); + let mut tower = Tower::new_with_key(&vote_simulator.node_pubkeys[0]); + + // Last vote is 47 + tower.record_vote(47, Hash::default()); + + // Trying to switch to an ancestor of last vote should only not panic + // if the current vote has a duplicate ancestor + let ancestor_of_voted_slot = 43; + let duplicate_ancestor1 = 44; + let duplicate_ancestor2 = 45; + vote_simulator.progress.set_unconfirmed_duplicate_slot( + duplicate_ancestor1, + &descendants.get(&duplicate_ancestor1).unwrap(), + ); + vote_simulator.progress.set_unconfirmed_duplicate_slot( + duplicate_ancestor2, + &descendants.get(&duplicate_ancestor2).unwrap(), + ); + assert_eq!( + tower.check_switch_threshold( + ancestor_of_voted_slot, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ), + SwitchForkDecision::FailedSwitchDuplicateRollback(duplicate_ancestor2) + ); + let mut confirm_ancestors = vec![duplicate_ancestor1]; + if should_panic { + // Adding the last duplicate ancestor will + // 1) Cause loop below to confirm last ancestor + // 2) Check switch threshold on a vote ancestor when there + // are no duplicates on that fork, which will cause a panic + confirm_ancestors.push(duplicate_ancestor2); + } + for (i, duplicate_ancestor) in confirm_ancestors.into_iter().enumerate() { + vote_simulator.progress.set_confirmed_duplicate_slot( + duplicate_ancestor, + ancestors.get(&duplicate_ancestor).unwrap(), + &descendants.get(&duplicate_ancestor).unwrap(), + ); + let res = tower.check_switch_threshold( + ancestor_of_voted_slot, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + ); + if i == 0 { + assert_eq!( + res, + SwitchForkDecision::FailedSwitchDuplicateRollback(duplicate_ancestor2) + ); + } + } + } + + #[test] + fn test_switch_threshold() { + let (bank0, mut vote_simulator, total_stake) = setup_switch_test(2); let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); let mut descendants = vote_simulator .bank_forks @@ -1697,7 +1830,8 @@ pub mod test { .unwrap() .descendants() .clone(); - let mut tower = Tower::new_with_key(&my_pubkey); + let mut tower = Tower::new_with_key(&vote_simulator.node_pubkeys[0]); + let other_vote_account = vote_simulator.vote_pubkeys[1]; // Last vote is 47 tower.record_vote(47, Hash::default()); diff --git a/core/src/fork_choice.rs b/core/src/fork_choice.rs index 2f223b26a3..93f5a3d084 100644 --- a/core/src/fork_choice.rs +++ b/core/src/fork_choice.rs @@ -4,6 +4,7 @@ use crate::{ replay_stage::HeaviestForkFailures, }; use solana_runtime::{bank::Bank, bank_forks::BankForks}; +use solana_sdk::clock::Slot; use std::{ collections::{HashMap, HashSet}, sync::{Arc, RwLock}, @@ -36,4 +37,8 @@ pub(crate) trait ForkChoice { ancestors: &HashMap>, bank_forks: &RwLock, ) -> (Arc, Option>); + + fn mark_fork_invalid_candidate(&mut self, invalid_slot: Slot); + + fn mark_fork_valid_candidate(&mut self, valid_slot: Slot); } diff --git a/core/src/heaviest_subtree_fork_choice.rs b/core/src/heaviest_subtree_fork_choice.rs index 236980e973..3248ed4739 100644 --- a/core/src/heaviest_subtree_fork_choice.rs +++ b/core/src/heaviest_subtree_fork_choice.rs @@ -25,6 +25,7 @@ const MAX_ROOT_PRINT_SECONDS: u64 = 30; enum UpdateLabel { Aggregate, Add, + MarkValid, Subtract, } @@ -32,6 +33,7 @@ enum UpdateLabel { enum UpdateOperation { Aggregate, Add(u64), + MarkValid, Subtract(u64), } @@ -40,6 +42,7 @@ impl UpdateOperation { match self { Self::Aggregate => panic!("Should not get here"), Self::Add(stake) => *stake += new_stake, + Self::MarkValid => panic!("Should not get here"), Self::Subtract(stake) => *stake += new_stake, } } @@ -56,6 +59,9 @@ struct ForkInfo { best_slot: Slot, parent: Option, children: Vec, + // Whether the fork rooted at this slot is a valid contender + // for the best fork + is_candidate: bool, } pub struct HeaviestSubtreeForkChoice { @@ -142,6 +148,12 @@ impl HeaviestSubtreeForkChoice { .map(|fork_info| fork_info.stake_voted_subtree) } + pub fn is_candidate_slot(&self, slot: Slot) -> Option { + self.fork_infos + .get(&slot) + .map(|fork_info| fork_info.is_candidate) + } + pub fn root(&self) -> Slot { self.root } @@ -205,6 +217,7 @@ impl HeaviestSubtreeForkChoice { best_slot: root_info.best_slot, children: vec![self.root], parent: None, + is_candidate: true, }; self.fork_infos.insert(root_parent, root_parent_info); self.root = root_parent; @@ -226,6 +239,7 @@ impl HeaviestSubtreeForkChoice { best_slot: slot, children: vec![], parent, + is_candidate: true, }); if parent.is_none() { @@ -259,6 +273,15 @@ impl HeaviestSubtreeForkChoice { let child_weight = self .stake_voted_subtree(*child) .expect("child must exist in `self.fork_infos`"); + + // Don't count children currently marked as invalid + if !self + .is_candidate_slot(*child) + .expect("child must exist in tree") + { + continue; + } + if child_weight > maybe_best_child_weight || (maybe_best_child_weight == child_weight && *child < maybe_best_child) { @@ -268,6 +291,7 @@ impl HeaviestSubtreeForkChoice { true } + pub fn all_slots_stake_voted_subtree(&self) -> Vec<(Slot, u64)> { self.fork_infos .iter() @@ -346,18 +370,39 @@ impl HeaviestSubtreeForkChoice { } } - #[allow(clippy::map_entry)] + fn insert_mark_valid_aggregate_operations( + &self, + update_operations: &mut BTreeMap<(Slot, UpdateLabel), UpdateOperation>, + slot: Slot, + ) { + self.do_insert_aggregate_operations(update_operations, true, slot); + } + fn insert_aggregate_operations( &self, update_operations: &mut BTreeMap<(Slot, UpdateLabel), UpdateOperation>, slot: Slot, + ) { + self.do_insert_aggregate_operations(update_operations, false, slot); + } + + #[allow(clippy::map_entry)] + fn do_insert_aggregate_operations( + &self, + update_operations: &mut BTreeMap<(Slot, UpdateLabel), UpdateOperation>, + should_mark_valid: bool, + slot: Slot, ) { for parent in self.ancestor_iterator(slot) { - let label = (parent, UpdateLabel::Aggregate); - if update_operations.contains_key(&label) { + let aggregate_label = (parent, UpdateLabel::Aggregate); + if update_operations.contains_key(&aggregate_label) { break; } else { - update_operations.insert(label, UpdateOperation::Aggregate); + if should_mark_valid { + update_operations + .insert((parent, UpdateLabel::MarkValid), UpdateOperation::MarkValid); + } + update_operations.insert(aggregate_label, UpdateOperation::Aggregate); } } } @@ -375,17 +420,44 @@ impl HeaviestSubtreeForkChoice { let mut best_child_slot = slot; for &child in &fork_info.children { let child_stake_voted_subtree = self.stake_voted_subtree(child).unwrap(); + // Child forks that are not candidates still contribute to the weight + // of the subtree rooted at `slot`. For instance: + /* + Build fork structure: + slot 0 + | + slot 1 + / \ + slot 2 | + | slot 3 (34%) + slot 4 (66%) + + If slot 4 is a duplicate slot, so no longer qualifies as a candidate until + the slot is confirmed, the weight of votes on slot 4 should still count towards + slot 2, otherwise we might pick slot 3 as the heaviest fork to build blocks on + instead of slot 2. + */ + + // See comment above for why this check is outside of the `is_candidate` check. stake_voted_subtree += child_stake_voted_subtree; - if best_child_slot == slot || - child_stake_voted_subtree > best_child_stake_voted_subtree || - // tiebreaker by slot height, prioritize earlier slot - (child_stake_voted_subtree == best_child_stake_voted_subtree && child < best_child_slot) + + // Note: If there's no valid children, then the best slot should default to the + // input `slot` itself. + if self + .is_candidate_slot(child) + .expect("Child must exist in fork_info map") + && (best_child_slot == slot || + child_stake_voted_subtree > best_child_stake_voted_subtree || + // tiebreaker by slot height, prioritize earlier slot + (child_stake_voted_subtree == best_child_stake_voted_subtree && child < best_child_slot)) { - best_child_stake_voted_subtree = child_stake_voted_subtree; - best_child_slot = child; - best_slot = self - .best_slot(child) - .expect("`child` must exist in `self.fork_infos`"); + { + best_child_stake_voted_subtree = child_stake_voted_subtree; + best_child_slot = child; + best_slot = self + .best_slot(child) + .expect("`child` must exist in `self.fork_infos`"); + } } } } else { @@ -397,6 +469,12 @@ impl HeaviestSubtreeForkChoice { fork_info.best_slot = best_slot; } + fn mark_slot_valid(&mut self, valid_slot: Slot) { + if let Some(fork_info) = self.fork_infos.get_mut(&valid_slot) { + fork_info.is_candidate = true; + } + } + fn generate_update_operations( &mut self, pubkey_votes: &[(Pubkey, Slot)], @@ -453,6 +531,7 @@ impl HeaviestSubtreeForkChoice { // Iterate through the update operations from greatest to smallest slot for ((slot, _), operation) in update_operations.into_iter().rev() { match operation { + UpdateOperation::MarkValid => self.mark_slot_valid(slot), UpdateOperation::Aggregate => self.aggregate_slot(slot), UpdateOperation::Add(stake) => self.add_slot_stake(slot, stake), UpdateOperation::Subtract(stake) => self.subtract_slot_stake(slot, stake), @@ -602,6 +681,33 @@ impl ForkChoice for HeaviestSubtreeForkChoice { }), ) } + + fn mark_fork_invalid_candidate(&mut self, invalid_slot: Slot) { + let fork_info = self.fork_infos.get_mut(&invalid_slot); + if let Some(fork_info) = fork_info { + if fork_info.is_candidate { + fork_info.is_candidate = false; + // Aggregate to find the new best slots excluding this fork + let mut aggregate_operations = BTreeMap::new(); + self.insert_aggregate_operations(&mut aggregate_operations, invalid_slot); + self.process_update_operations(aggregate_operations); + } + } + } + + fn mark_fork_valid_candidate(&mut self, valid_slot: Slot) { + let mut aggregate_operations = BTreeMap::new(); + let fork_info = self.fork_infos.get_mut(&valid_slot); + if let Some(fork_info) = fork_info { + // If a bunch of slots on the same fork are confirmed at once, then only the latest + // slot will incur this aggregation operation + fork_info.is_candidate = true; + self.insert_mark_valid_aggregate_operations(&mut aggregate_operations, valid_slot); + } + + // Aggregate to find the new best slots including this fork + self.process_update_operations(aggregate_operations); + } } struct AncestorIterator<'a> { @@ -1563,6 +1669,78 @@ mod test { ); } + #[test] + fn test_mark_valid_invalid_forks() { + let mut heaviest_subtree_fork_choice = setup_forks(); + let stake = 100; + let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys(3, stake); + + let pubkey_votes: Vec<(Pubkey, Slot)> = vec![ + (vote_pubkeys[0], 6), + (vote_pubkeys[1], 6), + (vote_pubkeys[2], 2), + ]; + let expected_best_slot = 6; + assert_eq!( + heaviest_subtree_fork_choice.add_votes( + &pubkey_votes, + bank.epoch_stakes_map(), + bank.epoch_schedule() + ), + expected_best_slot, + ); + + // Mark slot 5 as invalid, the best fork should be its ancestor 3, + // not the other for at 4. + let invalid_candidate = 5; + heaviest_subtree_fork_choice.mark_fork_invalid_candidate(invalid_candidate); + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 3); + assert!(!heaviest_subtree_fork_choice + .is_candidate_slot(invalid_candidate) + .unwrap()); + + // The ancestor is still a candidate + assert!(heaviest_subtree_fork_choice.is_candidate_slot(3).unwrap()); + + // Adding another descendant to the invalid candidate won't + // update the best slot, even if it contains votes + let new_leaf_slot7 = 7; + heaviest_subtree_fork_choice.add_new_leaf_slot(new_leaf_slot7, Some(6)); + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 3); + let pubkey_votes: Vec<(Pubkey, Slot)> = vec![(vote_pubkeys[0], new_leaf_slot7)]; + let invalid_slot_ancestor = 3; + assert_eq!( + heaviest_subtree_fork_choice.add_votes( + &pubkey_votes, + bank.epoch_stakes_map(), + bank.epoch_schedule() + ), + invalid_slot_ancestor, + ); + + // Adding a descendant to the ancestor of the invalid candidate *should* update + // the best slot though, since the ancestor is on the heaviest fork + let new_leaf_slot8 = 8; + heaviest_subtree_fork_choice.add_new_leaf_slot(new_leaf_slot8, Some(invalid_slot_ancestor)); + assert_eq!( + heaviest_subtree_fork_choice.best_overall_slot(), + new_leaf_slot8 + ); + + // If we mark slot a descendant of `invalid_candidate` as valid, then that + // should also mark `invalid_candidate` as valid, and the best slot should + // be the leaf of the heaviest fork, `new_leaf_slot`. + heaviest_subtree_fork_choice.mark_fork_valid_candidate(invalid_candidate); + assert!(heaviest_subtree_fork_choice + .is_candidate_slot(invalid_candidate) + .unwrap()); + assert_eq!( + heaviest_subtree_fork_choice.best_overall_slot(), + // Should pick the smaller slot of the two new equally weighted leaves + new_leaf_slot7 + ); + } + fn setup_forks() -> HeaviestSubtreeForkChoice { /* Build fork structure: diff --git a/core/src/lib.rs b/core/src/lib.rs index f096cead27..a09a068bca 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -22,6 +22,7 @@ pub mod shred_fetch_stage; #[macro_use] pub mod contact_info; pub mod cluster_info; +pub mod cluster_slot_state_verifier; pub mod cluster_slots; pub mod cluster_slots_service; pub mod consensus; diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index d182460d96..222b8dc377 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -275,6 +275,10 @@ impl PohRecorder { || !self.is_same_fork_as_previous_leader(current_slot))) } + pub fn last_reset_slot(&self) -> Slot { + self.start_slot + } + /// returns if leader slot has been reached, how many grace ticks were afforded, /// imputed leader_slot and self.start_slot /// reached_leader_slot() == true means "ready for a bank" diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index a4b9b562cf..a6f2671ddd 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -139,6 +139,7 @@ pub(crate) struct ForkProgress { pub(crate) propagated_stats: PropagatedStats, pub(crate) replay_stats: ReplaySlotStats, pub(crate) replay_progress: ConfirmationProgress, + pub(crate) duplicate_stats: DuplicateStats, // Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only // count new blocks replayed since last restart, which won't include // blocks already existing in the ledger/before snapshot at start, @@ -151,6 +152,7 @@ impl ForkProgress { pub fn new( last_entry: Hash, prev_leader_slot: Option, + duplicate_stats: DuplicateStats, validator_stake_info: Option, num_blocks_on_fork: u64, num_dropped_blocks_on_fork: u64, @@ -184,6 +186,7 @@ impl ForkProgress { fork_stats: ForkStats::default(), replay_stats: ReplaySlotStats::default(), replay_progress: ConfirmationProgress::new(last_entry), + duplicate_stats, num_blocks_on_fork, num_dropped_blocks_on_fork, propagated_stats: PropagatedStats { @@ -203,6 +206,7 @@ impl ForkProgress { my_pubkey: &Pubkey, voting_pubkey: &Pubkey, prev_leader_slot: Option, + duplicate_stats: DuplicateStats, num_blocks_on_fork: u64, num_dropped_blocks_on_fork: u64, ) -> Self { @@ -222,11 +226,20 @@ impl ForkProgress { Self::new( bank.last_blockhash(), prev_leader_slot, + duplicate_stats, validator_fork_info, num_blocks_on_fork, num_dropped_blocks_on_fork, ) } + + pub fn is_duplicate_confirmed(&self) -> bool { + self.duplicate_stats.is_duplicate_confirmed + } + + pub fn set_duplicate_confirmed(&mut self) { + self.duplicate_stats.set_duplicate_confirmed(); + } } #[derive(Debug, Clone, Default)] @@ -241,7 +254,7 @@ pub(crate) struct ForkStats { pub(crate) vote_threshold: bool, pub(crate) is_locked_out: bool, pub(crate) voted_stakes: VotedStakes, - pub(crate) confirmation_reported: bool, + pub(crate) is_supermajority_confirmed: bool, pub(crate) computed: bool, pub(crate) lockout_intervals: LockoutIntervals, } @@ -259,6 +272,38 @@ pub(crate) struct PropagatedStats { pub(crate) total_epoch_stake: u64, } +#[derive(Clone, Default)] +pub(crate) struct DuplicateStats { + latest_unconfirmed_duplicate_ancestor: Option, + is_duplicate_confirmed: bool, +} + +impl DuplicateStats { + pub fn new_with_unconfirmed_duplicate_ancestor( + latest_unconfirmed_duplicate_ancestor: Option, + ) -> Self { + Self { + latest_unconfirmed_duplicate_ancestor, + is_duplicate_confirmed: false, + } + } + + fn set_duplicate_confirmed(&mut self) { + self.is_duplicate_confirmed = true; + self.latest_unconfirmed_duplicate_ancestor = None; + } + + fn update_with_newly_confirmed_duplicate_ancestor(&mut self, newly_confirmed_ancestor: Slot) { + if let Some(latest_unconfirmed_duplicate_ancestor) = + self.latest_unconfirmed_duplicate_ancestor + { + if latest_unconfirmed_duplicate_ancestor <= newly_confirmed_ancestor { + self.latest_unconfirmed_duplicate_ancestor = None; + } + } + } +} + impl PropagatedStats { pub fn add_vote_pubkey(&mut self, vote_pubkey: Pubkey, stake: u64) { if self.propagated_validators.insert(vote_pubkey) { @@ -347,6 +392,12 @@ impl ProgressMap { .map(|fork_progress| &mut fork_progress.fork_stats) } + pub fn is_dead(&self, slot: Slot) -> Option { + self.progress_map + .get(&slot) + .map(|fork_progress| fork_progress.is_dead) + } + pub fn is_propagated(&self, slot: Slot) -> bool { let leader_slot_to_check = self.get_latest_leader_slot(slot); @@ -378,6 +429,118 @@ impl ProgressMap { } } + pub fn is_unconfirmed_duplicate(&self, slot: Slot) -> Option { + self.get(&slot).map(|p| { + p.duplicate_stats + .latest_unconfirmed_duplicate_ancestor + .map(|ancestor| ancestor == slot) + .unwrap_or(false) + }) + } + + pub fn latest_unconfirmed_duplicate_ancestor(&self, slot: Slot) -> Option { + self.get(&slot) + .map(|p| p.duplicate_stats.latest_unconfirmed_duplicate_ancestor) + .unwrap_or(None) + } + + pub fn set_unconfirmed_duplicate_slot(&mut self, slot: Slot, descendants: &HashSet) { + if let Some(fork_progress) = self.get_mut(&slot) { + if fork_progress.is_duplicate_confirmed() { + assert!(fork_progress + .duplicate_stats + .latest_unconfirmed_duplicate_ancestor + .is_none()); + return; + } + + if fork_progress + .duplicate_stats + .latest_unconfirmed_duplicate_ancestor + == Some(slot) + { + // Already been marked + return; + } + fork_progress + .duplicate_stats + .latest_unconfirmed_duplicate_ancestor = Some(slot); + + for d in descendants { + if let Some(fork_progress) = self.get_mut(&d) { + fork_progress + .duplicate_stats + .latest_unconfirmed_duplicate_ancestor = Some(std::cmp::max( + fork_progress + .duplicate_stats + .latest_unconfirmed_duplicate_ancestor + .unwrap_or(0), + slot, + )); + } + } + } + } + + pub fn set_confirmed_duplicate_slot( + &mut self, + slot: Slot, + ancestors: &HashSet, + descendants: &HashSet, + ) { + for a in ancestors { + if let Some(fork_progress) = self.get_mut(&a) { + fork_progress.set_duplicate_confirmed(); + } + } + + if let Some(slot_fork_progress) = self.get_mut(&slot) { + // Setting the fields here is nly correct and necessary if the loop above didn't + // already do this, so check with an assert. + assert!(!ancestors.contains(&slot)); + let slot_had_unconfirmed_duplicate_ancestor = slot_fork_progress + .duplicate_stats + .latest_unconfirmed_duplicate_ancestor + .is_some(); + slot_fork_progress.set_duplicate_confirmed(); + + if slot_had_unconfirmed_duplicate_ancestor { + for d in descendants { + if let Some(descendant_fork_progress) = self.get_mut(&d) { + descendant_fork_progress + .duplicate_stats + .update_with_newly_confirmed_duplicate_ancestor(slot); + } + } + } else { + // Neither this slot `S`, nor earlier ancestors were marked as duplicate, + // so this means all descendants either: + // 1) Have no duplicate ancestors + // 2) Have a duplicate ancestor > `S` + + // In both cases, there's no need to iterate through descendants because + // this confirmation on `S` is irrelevant to them. + } + } + } + + pub fn set_supermajority_confirmed_slot(&mut self, slot: Slot) { + let slot_progress = self.get_mut(&slot).unwrap(); + slot_progress.fork_stats.is_supermajority_confirmed = true; + } + + pub fn is_supermajority_confirmed(&self, slot: Slot) -> Option { + self.progress_map + .get(&slot) + .map(|s| s.fork_stats.is_supermajority_confirmed) + } + + pub fn is_duplicate_confirmed(&self, slot: Slot) -> Option { + self.progress_map + .get(&slot) + .map(|s| s.is_duplicate_confirmed()) + } + pub fn get_bank_prev_leader_slot(&self, bank: &Bank) -> Option { let parent_slot = bank.parent_slot(); self.get_propagated_stats(parent_slot) @@ -420,6 +583,8 @@ impl ProgressMap { #[cfg(test)] mod test { use super::*; + use crate::consensus::test::VoteSimulator; + use trees::tr; #[test] fn test_add_vote_pubkey() { @@ -510,13 +675,21 @@ mod test { fn test_is_propagated_status_on_construction() { // If the given ValidatorStakeInfo == None, then this is not // a leader slot and is_propagated == false - let progress = ForkProgress::new(Hash::default(), Some(9), None, 0, 0); + let progress = ForkProgress::new( + Hash::default(), + Some(9), + DuplicateStats::default(), + None, + 0, + 0, + ); assert!(!progress.propagated_stats.is_propagated); // If the stake is zero, then threshold is always achieved let progress = ForkProgress::new( Hash::default(), Some(9), + DuplicateStats::default(), Some(ValidatorStakeInfo { total_epoch_stake: 0, ..ValidatorStakeInfo::default() @@ -531,6 +704,7 @@ mod test { let progress = ForkProgress::new( Hash::default(), Some(9), + DuplicateStats::default(), Some(ValidatorStakeInfo { total_epoch_stake: 2, ..ValidatorStakeInfo::default() @@ -544,6 +718,7 @@ mod test { let progress = ForkProgress::new( Hash::default(), Some(9), + DuplicateStats::default(), Some(ValidatorStakeInfo { stake: 1, total_epoch_stake: 2, @@ -560,6 +735,7 @@ mod test { let progress = ForkProgress::new( Hash::default(), Some(9), + DuplicateStats::default(), Some(ValidatorStakeInfo::default()), 0, 0, @@ -573,12 +749,23 @@ mod test { // Insert new ForkProgress for slot 10 (not a leader slot) and its // previous leader slot 9 (leader slot) - progress_map.insert(10, ForkProgress::new(Hash::default(), Some(9), None, 0, 0)); + progress_map.insert( + 10, + ForkProgress::new( + Hash::default(), + Some(9), + DuplicateStats::default(), + None, + 0, + 0, + ), + ); progress_map.insert( 9, ForkProgress::new( Hash::default(), None, + DuplicateStats::default(), Some(ValidatorStakeInfo::default()), 0, 0, @@ -593,7 +780,17 @@ mod test { // The previous leader before 8, slot 7, does not exist in // progress map, so is_propagated(8) should return true as // this implies the parent is rooted - progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None, 0, 0)); + progress_map.insert( + 8, + ForkProgress::new( + Hash::default(), + Some(7), + DuplicateStats::default(), + None, + 0, + 0, + ), + ); assert!(progress_map.is_propagated(8)); // If we set the is_propagated = true, is_propagated should return true @@ -616,4 +813,157 @@ mod test { .is_leader_slot = true; assert!(!progress_map.is_propagated(10)); } + + fn setup_set_unconfirmed_and_confirmed_duplicate_slot_tests( + smaller_duplicate_slot: Slot, + larger_duplicate_slot: Slot, + ) -> (ProgressMap, RwLock) { + // Create simple fork 0 -> 1 -> 2 -> 3 -> 4 -> 5 + let forks = tr(0) / (tr(1) / (tr(2) / (tr(3) / (tr(4) / tr(5))))); + let mut vote_simulator = VoteSimulator::new(1); + vote_simulator.fill_bank_forks(forks, &HashMap::new()); + let VoteSimulator { + mut progress, + bank_forks, + .. + } = vote_simulator; + let descendants = bank_forks.read().unwrap().descendants().clone(); + + // Mark the slots as unconfirmed duplicates + progress.set_unconfirmed_duplicate_slot( + smaller_duplicate_slot, + &descendants.get(&smaller_duplicate_slot).unwrap(), + ); + progress.set_unconfirmed_duplicate_slot( + larger_duplicate_slot, + &descendants.get(&larger_duplicate_slot).unwrap(), + ); + + // Correctness checks + for slot in bank_forks.read().unwrap().banks().keys() { + if *slot < smaller_duplicate_slot { + assert!(progress + .latest_unconfirmed_duplicate_ancestor(*slot) + .is_none()); + } else if *slot < larger_duplicate_slot { + assert_eq!( + progress + .latest_unconfirmed_duplicate_ancestor(*slot) + .unwrap(), + smaller_duplicate_slot + ); + } else { + assert_eq!( + progress + .latest_unconfirmed_duplicate_ancestor(*slot) + .unwrap(), + larger_duplicate_slot + ); + } + } + + (progress, bank_forks) + } + + #[test] + fn test_set_unconfirmed_duplicate_confirm_smaller_slot_first() { + let smaller_duplicate_slot = 1; + let larger_duplicate_slot = 4; + let (mut progress, bank_forks) = setup_set_unconfirmed_and_confirmed_duplicate_slot_tests( + smaller_duplicate_slot, + larger_duplicate_slot, + ); + let descendants = bank_forks.read().unwrap().descendants().clone(); + let ancestors = bank_forks.read().unwrap().ancestors(); + + // Mark the smaller duplicate slot as confirmed + progress.set_confirmed_duplicate_slot( + smaller_duplicate_slot, + &ancestors.get(&smaller_duplicate_slot).unwrap(), + &descendants.get(&smaller_duplicate_slot).unwrap(), + ); + for slot in bank_forks.read().unwrap().banks().keys() { + if *slot < larger_duplicate_slot { + // Only slots <= smaller_duplicate_slot have been duplicate confirmed + if *slot <= smaller_duplicate_slot { + assert!(progress.is_duplicate_confirmed(*slot).unwrap()); + } else { + assert!(!progress.is_duplicate_confirmed(*slot).unwrap()); + } + // The unconfirmed duplicate flag has been cleared on the smaller + // descendants because their most recent duplicate ancestor has + // been confirmed + assert!(progress + .latest_unconfirmed_duplicate_ancestor(*slot) + .is_none()); + } else { + assert!(!progress.is_duplicate_confirmed(*slot).unwrap(),); + // The unconfirmed duplicate flag has not been cleared on the smaller + // descendants because their most recent duplicate ancestor, + // `larger_duplicate_slot` has not yet been confirmed + assert_eq!( + progress + .latest_unconfirmed_duplicate_ancestor(*slot) + .unwrap(), + larger_duplicate_slot + ); + } + } + + // Mark the larger duplicate slot as confirmed, all slots should no longer + // have any unconfirmed duplicate ancestors, and should be marked as duplciate confirmed + progress.set_confirmed_duplicate_slot( + larger_duplicate_slot, + &ancestors.get(&larger_duplicate_slot).unwrap(), + &descendants.get(&larger_duplicate_slot).unwrap(), + ); + for slot in bank_forks.read().unwrap().banks().keys() { + // All slots <= the latest duplciate confirmed slot are ancestors of + // that slot, so they should all be marked duplicate confirmed + assert_eq!( + progress.is_duplicate_confirmed(*slot).unwrap(), + *slot <= larger_duplicate_slot + ); + assert!(progress + .latest_unconfirmed_duplicate_ancestor(*slot) + .is_none()); + } + } + + #[test] + fn test_set_unconfirmed_duplicate_confirm_larger_slot_first() { + let smaller_duplicate_slot = 1; + let larger_duplicate_slot = 4; + let (mut progress, bank_forks) = setup_set_unconfirmed_and_confirmed_duplicate_slot_tests( + smaller_duplicate_slot, + larger_duplicate_slot, + ); + let descendants = bank_forks.read().unwrap().descendants().clone(); + let ancestors = bank_forks.read().unwrap().ancestors(); + + // Mark the larger duplicate slot as confirmed + progress.set_confirmed_duplicate_slot( + larger_duplicate_slot, + &ancestors.get(&larger_duplicate_slot).unwrap(), + &descendants.get(&larger_duplicate_slot).unwrap(), + ); + + // All slots should no longer have any unconfirmed duplicate ancestors + progress.set_confirmed_duplicate_slot( + larger_duplicate_slot, + &ancestors.get(&larger_duplicate_slot).unwrap(), + &descendants.get(&larger_duplicate_slot).unwrap(), + ); + for slot in bank_forks.read().unwrap().banks().keys() { + // All slots <= the latest duplciate confirmed slot are ancestors of + // that slot, so they should all be marked duplicate confirmed + assert_eq!( + progress.is_duplicate_confirmed(*slot).unwrap(), + *slot <= larger_duplicate_slot + ); + assert!(progress + .latest_unconfirmed_duplicate_ancestor(*slot) + .is_none()); + } + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 7313067f69..37bf15dd43 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4,19 +4,23 @@ use crate::{ broadcast_stage::RetransmitSlotsSender, cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, - cluster_info_vote_listener::VoteTracker, + cluster_info_vote_listener::{GossipDuplicateConfirmedSlotsReceiver, VoteTracker}, + cluster_slot_state_verifier::*, cluster_slots::ClusterSlots, commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, - consensus::{ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes}, + consensus::{ + ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD, + }, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, - progress_map::{ForkProgress, ProgressMap, PropagatedStats}, + progress_map::{DuplicateStats, ForkProgress, ProgressMap, PropagatedStats}, repair_service::DuplicateSlotsResetReceiver, result::Result, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, + window_service::DuplicateSlotReceiver, }; use solana_client::rpc_response::SlotUpdate; use solana_ledger::{ @@ -43,7 +47,7 @@ use solana_sdk::{ }; use solana_vote_program::{vote_instruction, vote_state::Vote}; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, result, sync::{ atomic::{AtomicBool, Ordering}, @@ -57,6 +61,8 @@ use std::{ pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; +pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; +pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; #[derive(PartialEq, Debug)] pub(crate) enum HeaviestForkFailures { @@ -120,10 +126,11 @@ pub struct ReplayTiming { compute_slot_stats_elapsed: u64, generate_new_bank_forks_elapsed: u64, replay_active_banks_elapsed: u64, - reset_duplicate_slots_elapsed: u64, wait_receive_elapsed: u64, heaviest_fork_failures_elapsed: u64, bank_count: u64, + process_gossip_duplicate_confirmed_slots_elapsed: u64, + process_duplicate_slots_elapsed: u64, } impl ReplayTiming { #[allow(clippy::too_many_arguments)] @@ -139,10 +146,11 @@ impl ReplayTiming { compute_slot_stats_elapsed: u64, generate_new_bank_forks_elapsed: u64, replay_active_banks_elapsed: u64, - reset_duplicate_slots_elapsed: u64, wait_receive_elapsed: u64, heaviest_fork_failures_elapsed: u64, bank_count: u64, + process_gossip_duplicate_confirmed_slots_elapsed: u64, + process_duplicate_slots_elapsed: u64, ) { self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed; self.compute_bank_stats_elapsed += compute_bank_stats_elapsed; @@ -154,10 +162,12 @@ impl ReplayTiming { self.compute_slot_stats_elapsed += compute_slot_stats_elapsed; self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed; self.replay_active_banks_elapsed += replay_active_banks_elapsed; - self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed; self.wait_receive_elapsed += wait_receive_elapsed; self.heaviest_fork_failures_elapsed += heaviest_fork_failures_elapsed; self.bank_count += bank_count; + self.process_gossip_duplicate_confirmed_slots_elapsed += + process_gossip_duplicate_confirmed_slots_elapsed; + self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed; let now = timestamp(); let elapsed_ms = now - self.last_print; if elapsed_ms > 1000 { @@ -207,8 +217,8 @@ impl ReplayTiming { i64 ), ( - "reset_duplicate_slots_elapsed", - self.reset_duplicate_slots_elapsed as i64, + "process_gossip_duplicate_confirmed_slots_elapsed", + self.process_gossip_duplicate_confirmed_slots_elapsed as i64, i64 ), ( @@ -222,6 +232,11 @@ impl ReplayTiming { i64 ), ("bank_count", self.bank_count as i64, i64), + ( + "process_duplicate_slots_elapsed", + self.process_duplicate_slots_elapsed as i64, + i64 + ) ); *self = ReplayTiming::default(); @@ -243,13 +258,15 @@ impl ReplayStage { bank_forks: Arc>, cluster_info: Arc, ledger_signal_receiver: Receiver, + duplicate_slots_receiver: DuplicateSlotReceiver, poh_recorder: Arc>, mut tower: Tower, vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, - duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, + _duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, replay_vote_sender: ReplayVoteSender, + gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, ) -> Self { let ReplayStageConfig { my_pubkey, @@ -294,6 +311,7 @@ impl ReplayStage { let mut partition_exists = false; let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut replay_timing = ReplayTiming::default(); + let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new(); loop { let allocated = thread_mem_usage::Allocatedp::default(); @@ -320,6 +338,8 @@ impl ReplayStage { let start = allocated.get(); let mut replay_active_banks_time = Measure::start("replay_active_banks_time"); + let ancestors = bank_forks.read().unwrap().ancestors(); + let descendants = bank_forks.read().unwrap().descendants().clone(); let did_complete_bank = Self::replay_active_banks( &blockstore, &bank_forks, @@ -333,19 +353,20 @@ impl ReplayStage { &bank_notification_sender, &rewards_recorder_sender, &subscriptions, + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, ); replay_active_banks_time.stop(); Self::report_memory(&allocated, "replay_active_banks", start); - let mut reset_duplicate_slots_time = Measure::start("reset_duplicate_slots"); - let mut ancestors = bank_forks.read().unwrap().ancestors(); - let mut descendants = bank_forks.read().unwrap().descendants().clone(); let forks_root = bank_forks.read().unwrap().root(); let start = allocated.get(); // Reset any duplicate slots that have been confirmed // by the network in anticipation of the confirmed version of // the slot + /*let mut reset_duplicate_slots_time = Measure::start("reset_duplicate_slots"); Self::reset_duplicate_slots( &duplicate_slots_reset_receiver, &mut ancestors, @@ -353,7 +374,35 @@ impl ReplayStage { &mut progress, &bank_forks, ); - reset_duplicate_slots_time.stop(); + reset_duplicate_slots_time.stop();*/ + + // Check for any newly confirmed slots detected from gossip. + let mut process_gossip_duplicate_confirmed_slots_time = Measure::start("process_gossip_duplicate_confirmed_slots"); + Self::process_gossip_duplicate_confirmed_slots( + &gossip_duplicate_confirmed_slots_receiver, + &mut gossip_duplicate_confirmed_slots, + &bank_forks, + &mut progress, + &mut heaviest_subtree_fork_choice, + &ancestors, + &descendants, + ); + process_gossip_duplicate_confirmed_slots_time.stop(); + + // Check to remove any duplicated slots from fork choice + let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots"); + if !tpu_has_bank { + Self::process_duplicate_slots( + &duplicate_slots_receiver, + &gossip_duplicate_confirmed_slots, + &bank_forks, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + ); + } + process_duplicate_slots_time.stop(); let mut collect_frozen_banks_time = Measure::start("frozen_banks"); let mut frozen_banks: Vec<_> = bank_forks @@ -391,20 +440,12 @@ impl ReplayStage { &bank_forks, ); - for slot in confirmed_forks { - progress - .get_mut(&slot) - .unwrap() - .fork_stats - .confirmation_reported = true; - } + Self::mark_slots_confirmed(&confirmed_forks, &bank_forks, &mut progress, &ancestors, &descendants, &mut heaviest_subtree_fork_choice); } compute_slot_stats_time.stop(); let mut select_forks_time = Measure::start("select_forks_time"); - let fork_choice: &mut dyn ForkChoice = - &mut heaviest_subtree_fork_choice; - let (heaviest_bank, heaviest_bank_on_same_voted_fork) = fork_choice + let (heaviest_bank, heaviest_bank_on_same_voted_fork) = heaviest_subtree_fork_choice .select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks); select_forks_time.stop(); @@ -418,7 +459,7 @@ impl ReplayStage { heaviest_fork_failures, } = Self::select_vote_and_reset_forks( &heaviest_bank, - &heaviest_bank_on_same_voted_fork, + heaviest_bank_on_same_voted_fork.as_ref(), &ancestors, &descendants, &progress, @@ -481,6 +522,7 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, &cache_block_time_sender, &bank_notification_sender, + &mut gossip_duplicate_confirmed_slots, ); }; voting_time.stop(); @@ -612,10 +654,11 @@ impl ReplayStage { compute_slot_stats_time.as_us(), generate_new_bank_forks_time.as_us(), replay_active_banks_time.as_us(), - reset_duplicate_slots_time.as_us(), wait_receive_time.as_us(), heaviest_fork_failures_time.as_us(), if did_complete_bank {1} else {0}, + process_gossip_duplicate_confirmed_slots_time.as_us(), + process_duplicate_slots_time.as_us(), ); } Ok(()) @@ -674,6 +717,9 @@ impl ReplayStage { // Initialize progress map with any root banks for bank in &frozen_banks { let prev_leader_slot = progress.get_bank_prev_leader_slot(bank); + let duplicate_stats = DuplicateStats::new_with_unconfirmed_duplicate_ancestor( + progress.latest_unconfirmed_duplicate_ancestor(bank.parent_slot()), + ); progress.insert( bank.slot(), ForkProgress::new_from_bank( @@ -681,6 +727,7 @@ impl ReplayStage { &my_pubkey, &vote_account, prev_leader_slot, + duplicate_stats, 0, 0, ), @@ -704,6 +751,7 @@ impl ReplayStage { ); } + #[allow(dead_code)] fn reset_duplicate_slots( duplicate_slots_reset_receiver: &DuplicateSlotsResetReceiver, ancestors: &mut HashMap>, @@ -722,6 +770,7 @@ impl ReplayStage { } } + #[allow(dead_code)] fn purge_unconfirmed_duplicate_slot( duplicate_slot: Slot, ancestors: &mut HashMap>, @@ -807,6 +856,90 @@ impl ReplayStage { .expect("must exist based on earlier check"); } + // Check for any newly confirmed slots by the cluster. This is only detects + // optimistic and in the future, duplicate slot confirmations on the exact + // single slots and does not account for votes on their descendants. Used solely + // for duplicate slot recovery. + fn process_gossip_duplicate_confirmed_slots( + gossip_duplicate_confirmed_slots_receiver: &GossipDuplicateConfirmedSlotsReceiver, + gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, + bank_forks: &RwLock, + progress: &mut ProgressMap, + fork_choice: &mut HeaviestSubtreeForkChoice, + ancestors: &HashMap>, + descendants: &HashMap>, + ) { + let root = bank_forks.read().unwrap().root(); + for new_confirmed_slots in gossip_duplicate_confirmed_slots_receiver.try_iter() { + for (confirmed_slot, confirmed_hash) in new_confirmed_slots { + if confirmed_slot <= root { + continue; + } else if let Some(prev_hash) = + gossip_duplicate_confirmed_slots.insert(confirmed_slot, confirmed_hash) + { + assert_eq!(prev_hash, confirmed_hash); + // Already processed this signal + return; + } + + check_slot_agrees_with_cluster( + confirmed_slot, + root, + bank_forks + .read() + .unwrap() + .get(confirmed_slot) + .map(|b| b.hash()), + gossip_duplicate_confirmed_slots, + ancestors, + descendants, + progress, + fork_choice, + SlotStateUpdate::DuplicateConfirmed, + ); + } + } + } + + // Checks for and handle forks with duplicate slots. + fn process_duplicate_slots( + duplicate_slots_receiver: &DuplicateSlotReceiver, + gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, + bank_forks: &RwLock, + ancestors: &HashMap>, + descendants: &HashMap>, + progress: &mut ProgressMap, + fork_choice: &mut HeaviestSubtreeForkChoice, + ) { + let duplicate_slots: Vec = duplicate_slots_receiver.try_iter().collect(); + let (root_slot, bank_hashes) = { + let r_bank_forks = bank_forks.read().unwrap(); + let bank_hashes: Vec> = duplicate_slots + .iter() + .map(|slot| r_bank_forks.get(*slot).map(|bank| bank.hash())) + .collect(); + + (r_bank_forks.root(), bank_hashes) + }; + + for (duplicate_slot, bank_hash) in duplicate_slots.into_iter().zip(bank_hashes.into_iter()) + { + // WindowService should only send the signal once per slot + + check_slot_agrees_with_cluster( + duplicate_slot, + root_slot, + bank_hash, + gossip_duplicate_confirmed_slots, + ancestors, + descendants, + progress, + fork_choice, + SlotStateUpdate::Duplicate, + ); + } + } + fn log_leader_change( my_pubkey: &Pubkey, bank_slot: Slot, @@ -999,7 +1132,6 @@ impl ReplayStage { transaction_status_sender: Option, replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, - subscriptions: &Arc, ) -> result::Result { let tx_count_before = bank_progress.replay_progress.num_txs; let confirm_result = blockstore_processor::confirm_slot( @@ -1016,48 +1148,40 @@ impl ReplayStage { ); let tx_count_after = bank_progress.replay_progress.num_txs; let tx_count = tx_count_after - tx_count_before; - confirm_result.map_err(|err| { - // LedgerCleanupService should not be cleaning up anything - // that comes after the root, so we should not see any - // errors related to the slot being purged - let slot = bank.slot(); - - // Block producer can abandon the block if it detects a better one - // while producing. Somewhat common and expected in a - // network with variable network/machine configuration. - let is_serious = !matches!( - err, - BlockstoreProcessorError::InvalidBlock(BlockError::TooFewTicks) - ); - if is_serious { - warn!("Fatal replay error in slot: {}, err: {:?}", slot, err); - } else { - info!("Slot had too few ticks: {}", slot); - } - Self::mark_dead_slot( - blockstore, - bank_progress, - slot, - &err, - is_serious, - subscriptions, - ); - + // All errors must lead to marking the slot as dead, otherwise, + // the `check_slot_agrees_with_cluster()` called by `replay_active_banks()` + // will break! err })?; Ok(tx_count) } + #[allow(clippy::too_many_arguments)] fn mark_dead_slot( blockstore: &Blockstore, - bank_progress: &mut ForkProgress, - slot: Slot, + bank: &Bank, + root: Slot, err: &BlockstoreProcessorError, - is_serious: bool, subscriptions: &Arc, + gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, + ancestors: &HashMap>, + descendants: &HashMap>, + progress: &mut ProgressMap, + heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, ) { + // Do not remove from progress map when marking dead! Needed by + // `process_gossip_duplicate_confirmed_slots()` + + // Block producer can abandon the block if it detects a better one + // while producing. Somewhat common and expected in a + // network with variable network/machine configuration. + let is_serious = !matches!( + err, + BlockstoreProcessorError::InvalidBlock(BlockError::TooFewTicks) + ); + let slot = bank.slot(); if is_serious { datapoint_error!( "replay-stage-mark_dead_slot", @@ -1071,7 +1195,7 @@ impl ReplayStage { ("slot", slot, i64) ); } - bank_progress.is_dead = true; + progress.get_mut(&slot).unwrap().is_dead = true; blockstore .set_dead_slot(slot) .expect("Failed to mark slot as dead in blockstore"); @@ -1080,6 +1204,17 @@ impl ReplayStage { err: format!("error: {:?}", err), timestamp: timestamp(), }); + check_slot_agrees_with_cluster( + slot, + root, + Some(bank.hash()), + gossip_duplicate_confirmed_slots, + ancestors, + descendants, + progress, + heaviest_subtree_fork_choice, + SlotStateUpdate::Dead, + ); } #[allow(clippy::too_many_arguments)] @@ -1102,13 +1237,13 @@ impl ReplayStage { heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, cache_block_time_sender: &Option, bank_notification_sender: &Option, + gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, ) { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); } trace!("handle votable bank {}", bank.slot()); - let (vote, tower_slots) = tower.new_vote_from_bank(bank, vote_account_pubkey); - let new_root = tower.record_bank_vote(vote); + let (new_root, tower_slots) = tower.record_bank_vote(bank, vote_account_pubkey); let last_vote = tower.last_vote_and_timestamp(); if let Err(err) = tower.save(&cluster_info.keypair) { @@ -1154,6 +1289,7 @@ impl ReplayStage { accounts_background_request_sender, highest_confirmed_root, heaviest_subtree_fork_choice, + gossip_duplicate_confirmed_slots, ); subscriptions.notify_roots(rooted_slots); if let Some(sender) = bank_notification_sender { @@ -1287,8 +1423,8 @@ impl ReplayStage { my_pubkey: &Pubkey, blockstore: &Blockstore, bank: &Arc, - poh_recorder: &Arc>, - leader_schedule_cache: &Arc, + poh_recorder: &Mutex, + leader_schedule_cache: &LeaderScheduleCache, ) { let next_leader_slot = leader_schedule_cache.next_leader_slot( &my_pubkey, @@ -1319,8 +1455,8 @@ impl ReplayStage { #[allow(clippy::too_many_arguments)] fn replay_active_banks( - blockstore: &Arc, - bank_forks: &Arc>, + blockstore: &Blockstore, + bank_forks: &RwLock, my_pubkey: &Pubkey, vote_account: &Pubkey, progress: &mut ProgressMap, @@ -1331,6 +1467,9 @@ impl ReplayStage { bank_notification_sender: &Option, rewards_recorder_sender: &Option, subscriptions: &Arc, + gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, + ancestors: &HashMap>, + descendants: &HashMap>, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1357,6 +1496,12 @@ impl ReplayStage { stats.num_dropped_blocks_on_fork + new_dropped_blocks; (num_blocks_on_fork, num_dropped_blocks_on_fork) }; + + // New children adopt the same latest duplicate ancestor as their parent. + let duplicate_stats = DuplicateStats::new_with_unconfirmed_duplicate_ancestor( + progress.latest_unconfirmed_duplicate_ancestor(bank.parent_slot()), + ); + // Insert a progress entry even for slots this node is the leader for, so that // 1) confirm_forks can report confirmation, 2) we can cache computations about // this bank in `select_forks()` @@ -1366,11 +1511,13 @@ impl ReplayStage { &my_pubkey, vote_account, prev_leader_slot, + duplicate_stats, num_blocks_on_fork, num_dropped_blocks_on_fork, ) }); if bank.collector_id() != my_pubkey { + let root_slot = bank_forks.read().unwrap().root(); let replay_result = Self::replay_blockstore_into_bank( &bank, &blockstore, @@ -1378,12 +1525,23 @@ impl ReplayStage { transaction_status_sender.clone(), replay_vote_sender, verify_recyclers, - subscriptions, ); match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, Err(err) => { - trace!("replay_result err: {:?}, slot {}", err, bank_slot); + // Error means the slot needs to be marked as dead + Self::mark_dead_slot( + blockstore, + &bank, + root_slot, + &err, + subscriptions, + gossip_duplicate_confirmed_slots, + ancestors, + descendants, + progress, + heaviest_subtree_fork_choice, + ); // If the bank was corrupted, don't try to run the below logic to check if the // bank is completed continue; @@ -1392,41 +1550,34 @@ impl ReplayStage { } assert_eq!(*bank_slot, bank.slot()); if bank.is_complete() { - if !blockstore.has_duplicate_shreds_in_slot(bank.slot()) { - bank_progress.replay_stats.report_stats( - bank.slot(), - bank_progress.replay_progress.num_entries, - bank_progress.replay_progress.num_shreds, - ); - did_complete_bank = true; - info!("bank frozen: {}", bank.slot()); - bank.freeze(); - heaviest_subtree_fork_choice - .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); - if let Some(sender) = bank_notification_sender { - sender - .send(BankNotification::Frozen(bank.clone())) - .unwrap_or_else(|err| { - warn!("bank_notification_sender failed: {:?}", err) - }); - } - - Self::record_rewards(&bank, &rewards_recorder_sender); - } else { - Self::mark_dead_slot( - blockstore, - bank_progress, - bank.slot(), - &BlockstoreProcessorError::InvalidBlock(BlockError::DuplicateBlock), - true, - subscriptions, - ); - warn!( - "{} duplicate shreds detected, not freezing bank {}", - my_pubkey, - bank.slot() - ); + bank_progress.replay_stats.report_stats( + bank.slot(), + bank_progress.replay_progress.num_entries, + bank_progress.replay_progress.num_shreds, + ); + did_complete_bank = true; + info!("bank frozen: {}", bank.slot()); + bank.freeze(); + heaviest_subtree_fork_choice + .add_new_leaf_slot(bank.slot(), Some(bank.parent_slot())); + check_slot_agrees_with_cluster( + bank.slot(), + bank_forks.read().unwrap().root(), + Some(bank.hash()), + gossip_duplicate_confirmed_slots, + ancestors, + descendants, + progress, + heaviest_subtree_fork_choice, + SlotStateUpdate::Frozen, + ); + if let Some(sender) = bank_notification_sender { + sender + .send(BankNotification::Frozen(bank.clone())) + .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } + + Self::record_rewards(&bank, &rewards_recorder_sender); } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", @@ -1596,7 +1747,8 @@ impl ReplayStage { // a bank to vote on, a bank to reset to, pub(crate) fn select_vote_and_reset_forks( heaviest_bank: &Arc, - heaviest_bank_on_same_voted_fork: &Option>, + // Should only be None if there was no previous vote + heaviest_bank_on_same_voted_fork: Option<&Arc>, ancestors: &HashMap>, descendants: &HashMap>, progress: &ProgressMap, @@ -1627,25 +1779,90 @@ impl ReplayStage { .epoch_vote_accounts(heaviest_bank.epoch()) .expect("Bank epoch vote accounts must contain entry for the bank's own epoch"), ); - if let SwitchForkDecision::FailedSwitchThreshold(_, _) = switch_fork_decision { - // If we can't switch, then reset to the the next votable - // bank on the same fork as our last vote, but don't vote - info!( - "Waiting to switch vote to {}, resetting to slot {:?} on same fork for now", - heaviest_bank.slot(), - heaviest_bank_on_same_voted_fork.as_ref().map(|b| b.slot()) - ); - failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold( - heaviest_bank.slot(), - )); - heaviest_bank_on_same_voted_fork - .as_ref() - .map(|b| (b, switch_fork_decision)) - } else { - // If the switch threshold is observed, halt voting on - // the current fork and attempt to vote/reset Poh to - // the heaviest bank - Some((heaviest_bank, switch_fork_decision)) + + match switch_fork_decision { + SwitchForkDecision::FailedSwitchThreshold(_, _) => { + let reset_bank = heaviest_bank_on_same_voted_fork; + // If we can't switch and our last vote was on a non-duplicate/confirmed slot, then + // reset to the the next votable bank on the same fork as our last vote, + // but don't vote. + + // We don't just reset to the heaviest fork when switch threshold fails because + // a situation like this can occur: + + /* Figure 1: + slot 0 + | + slot 1 + / \ + slot 2 (last vote) | + | slot 8 (10%) + slot 4 (9%) + */ + + // Imagine 90% of validators voted on slot 4, but only 9% landed. If everybody that fails + // the switch theshold abandons slot 4 to build on slot 8 (because it's *currently* heavier), + // then there will be no blocks to include the votes for slot 4, and the network halts + // because 90% of validators can't vote + info!( + "Waiting to switch vote to {}, resetting to slot {:?} for now", + heaviest_bank.slot(), + reset_bank.as_ref().map(|b| b.slot()), + ); + failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold( + heaviest_bank.slot(), + )); + reset_bank.map(|b| (b, switch_fork_decision)) + } + SwitchForkDecision::FailedSwitchDuplicateRollback(latest_duplicate_ancestor) => { + // If we can't switch and our last vote was on an unconfirmed, duplicate slot, + // then we need to reset to the heaviest bank, even if the heaviest bank is not + // a descendant of the last vote (usually for switch threshold failures we reset + // to the heaviest descendant of the last vote, but in this case, the last vote + // was on a duplicate branch). This is because in the case of *unconfirmed* duplicate + // slots, somebody needs to generate an alternative branch to escape a situation + // like a 50-50 split where both partitions have voted on different versions of the + // same duplicate slot. + + // Unlike the situation described in `Figure 1` above, this is safe. To see why, + // imagine the same situation described in Figure 1 above occurs, but slot 2 is + // a duplicate block. There are now a few cases: + // + // Note first that DUPLICATE_THRESHOLD + SWITCH_FORK_THRESHOLD + DUPLICATE_LIVENESS_THRESHOLD = 1; + // + // 1) > DUPLICATE_THRESHOLD of the network voted on some version of slot 2. Because duplicate slots can be confirmed + // by gossip, unlike the situation described in `Figure 1`, we don't need those + // votes to land in a descendant to confirm slot 2. Once slot 2 is confirmed by + // gossip votes, that fork is added back to the fork choice set and falls back into + // normal fork choice, which is covered by the `FailedSwitchThreshold` case above + // (everyone will resume building on their last voted fork, slot 4, since slot 8 + // doesn't have for switch threshold) + // + // 2) <= DUPLICATE_THRESHOLD of the network voted on some version of slot 2, > SWITCH_FORK_THRESHOLD of the network voted + // on slot 8. Then everybody abandons the duplicate fork from fork choice and both builds + // on slot 8's fork. They can also vote on slot 8's fork because it has sufficient weight + // to pass the switching threshold + // + // 3) <= DUPLICATE_THRESHOLD of the network voted on some version of slot 2, <= SWITCH_FORK_THRESHOLD of the network voted + // on slot 8. This means more than DUPLICATE_LIVENESS_THRESHOLD of the network is gone, so we cannot + // guarantee progress anyways + + // Note the heaviest fork is never descended from a known unconfirmed duplicate slot + // because the fork choice rule ensures that (marks it as an invalid candidate), + // thus it's safe to use as the reset bank. + let reset_bank = Some(heaviest_bank); + info!( + "Waiting to switch vote to {}, resetting to slot {:?} for now, latest duplicate ancestor: {:?}", + heaviest_bank.slot(), + reset_bank.as_ref().map(|b| b.slot()), + latest_duplicate_ancestor, + ); + failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold( + heaviest_bank.slot(), + )); + reset_bank.map(|b| (b, switch_fork_decision)) + } + _ => Some((heaviest_bank, switch_fork_decision)), } }; @@ -1825,6 +2042,47 @@ impl ReplayStage { did_newly_reach_threshold } + fn mark_slots_confirmed( + confirmed_forks: &[Slot], + bank_forks: &RwLock, + progress: &mut ProgressMap, + ancestors: &HashMap>, + descendants: &HashMap>, + fork_choice: &mut HeaviestSubtreeForkChoice, + ) { + let (root_slot, bank_hashes) = { + let r_bank_forks = bank_forks.read().unwrap(); + let bank_hashes: Vec> = confirmed_forks + .iter() + .map(|slot| r_bank_forks.get(*slot).map(|bank| bank.hash())) + .collect(); + + (r_bank_forks.root(), bank_hashes) + }; + for (slot, bank_hash) in confirmed_forks.iter().zip(bank_hashes.into_iter()) { + // This case should be guaranteed as false by confirm_forks() + if let Some(false) = progress.is_supermajority_confirmed(*slot) { + // Because supermajority confirmation will iterate through all ancestors/descendants + // in `check_slot_agrees_with_cluster`, only incur this cost if the slot wasn't already + // confirmed + progress.set_supermajority_confirmed_slot(*slot); + check_slot_agrees_with_cluster( + *slot, + root_slot, + bank_hash, + // Don't need to pass the gossip confirmed slots since `slot` + // is already marked as confirmed in progress + &BTreeMap::new(), + ancestors, + descendants, + progress, + fork_choice, + SlotStateUpdate::DuplicateConfirmed, + ); + } + } + } + fn confirm_forks( tower: &Tower, voted_stakes: &VotedStakes, @@ -1834,7 +2092,7 @@ impl ReplayStage { ) -> Vec { let mut confirmed_forks = vec![]; for (slot, prog) in progress.iter() { - if !prog.fork_stats.confirmation_reported { + if !prog.fork_stats.is_supermajority_confirmed { let bank = bank_forks .read() .unwrap() @@ -1866,6 +2124,7 @@ impl ReplayStage { accounts_background_request_sender: &AbsRequestSender, highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, ) { bank_forks.write().unwrap().set_root( new_root, @@ -1875,6 +2134,9 @@ impl ReplayStage { let r_bank_forks = bank_forks.read().unwrap(); progress.handle_new_root(&r_bank_forks); heaviest_subtree_fork_choice.set_root(new_root); + let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root); + // gossip_duplicate_confirmed_slots now only contains entries >= `new_root` + std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root); } fn generate_new_bank_forks( @@ -2031,7 +2293,7 @@ pub(crate) mod tests { use solana_ledger::{ blockstore::make_slot_entries, blockstore::{entries_to_test_shreds, BlockstoreError}, - create_new_tmp_ledger, + blockstore_processor, create_new_tmp_ledger, entry::{self, next_entry, Entry}, genesis_utils::{create_genesis_config, create_genesis_config_with_leader}, get_tmp_ledger_path, @@ -2123,6 +2385,7 @@ pub(crate) mod tests { bank0.collector_id(), &Pubkey::default(), None, + DuplicateStats::default(), 0, 0, ), @@ -2180,6 +2443,7 @@ pub(crate) mod tests { bank1.collector_id(), validator_voting_keys.get(&bank1.collector_id()).unwrap(), Some(0), + DuplicateStats::default(), 0, 0, ), @@ -2271,8 +2535,16 @@ pub(crate) mod tests { bank_forks.write().unwrap().insert(root_bank); let mut progress = ProgressMap::default(); for i in 0..=root { - progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); + progress.insert( + i, + ForkProgress::new(Hash::default(), None, DuplicateStats::default(), None, 0, 0), + ); } + let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = + vec![root - 1, root, root + 1] + .into_iter() + .map(|s| (s, Hash::default())) + .collect(); ReplayStage::handle_new_root( root, &bank_forks, @@ -2280,10 +2552,19 @@ pub(crate) mod tests { &AbsRequestSender::default(), None, &mut heaviest_subtree_fork_choice, + &mut gossip_duplicate_confirmed_slots, ); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); assert!(progress.get(&root).is_some()); + // root - 1 is filtered out + assert_eq!( + gossip_duplicate_confirmed_slots + .keys() + .cloned() + .collect::>(), + vec![root, root + 1] + ); } #[test] @@ -2315,7 +2596,10 @@ pub(crate) mod tests { let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new(root); let mut progress = ProgressMap::default(); for i in 0..=root { - progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); + progress.insert( + i, + ForkProgress::new(Hash::default(), None, DuplicateStats::default(), None, 0, 0), + ); } ReplayStage::handle_new_root( root, @@ -2324,6 +2608,7 @@ pub(crate) mod tests { &AbsRequestSender::default(), Some(confirmed_root), &mut heaviest_subtree_fork_choice, + &mut BTreeMap::new(), ); assert_eq!(bank_forks.read().unwrap().root(), root); assert!(bank_forks.read().unwrap().get(confirmed_root).is_some()); @@ -2565,9 +2850,9 @@ pub(crate) mod tests { let bank0 = bank_forks.working_bank(); let mut progress = ProgressMap::default(); let last_blockhash = bank0.last_blockhash(); - let mut bank0_progress = progress - .entry(bank0.slot()) - .or_insert_with(|| ForkProgress::new(last_blockhash, None, None, 0, 0)); + let mut bank0_progress = progress.entry(bank0.slot()).or_insert_with(|| { + ForkProgress::new(last_blockhash, None, DuplicateStats::default(), None, 0, 0) + }); let shreds = shred_to_insert(&mint_keypair, bank0.clone()); blockstore.insert_shreds(shreds, None, false).unwrap(); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); @@ -2580,14 +2865,29 @@ pub(crate) mod tests { None, &replay_vote_sender, &&VerifyRecyclers::default(), - &Arc::new(RpcSubscriptions::new( - &exit, - bank_forks.clone(), - block_commitment_cache, - OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), - )), ); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + block_commitment_cache, + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + )); + if let Err(err) = &res { + ReplayStage::mark_dead_slot( + &blockstore, + &bank0, + 0, + err, + &subscriptions, + &BTreeMap::new(), + &HashMap::new(), + &HashMap::new(), + &mut progress, + &mut HeaviestSubtreeForkChoice::new(0), + ); + } + // Check that the erroring bank was marked as dead in the progress map assert!(progress .get(&bank0.slot()) @@ -2875,7 +3175,6 @@ pub(crate) mod tests { // bank 0 has no votes, should not send any votes on the channel assert_eq!(newly_computed, vec![0]); - // The only vote is in bank 1, and bank_forks does not currently contain // bank 1, so no slot should be confirmed. { @@ -2888,14 +3187,21 @@ pub(crate) mod tests { &bank_forks, ); - assert!(confirmed_forks.is_empty()) + assert!(confirmed_forks.is_empty()); } // Insert the bank that contains a vote for slot 0, which confirms slot 0 bank_forks.write().unwrap().insert(bank1); progress.insert( 1, - ForkProgress::new(bank0.last_blockhash(), None, None, 0, 0), + ForkProgress::new( + bank0.last_blockhash(), + None, + DuplicateStats::default(), + None, + 0, + 0, + ), ); let ancestors = bank_forks.read().unwrap().ancestors(); let mut frozen_banks: Vec<_> = bank_forks @@ -2928,6 +3234,7 @@ pub(crate) mod tests { &progress, &bank_forks, ); + // No new stats should have been computed assert_eq!(confirmed_forks, vec![0]); } @@ -3294,6 +3601,7 @@ pub(crate) mod tests { ForkProgress::new( Hash::default(), Some(9), + DuplicateStats::default(), Some(ValidatorStakeInfo { total_epoch_stake, ..ValidatorStakeInfo::default() @@ -3307,6 +3615,7 @@ pub(crate) mod tests { ForkProgress::new( Hash::default(), Some(8), + DuplicateStats::default(), Some(ValidatorStakeInfo { total_epoch_stake, ..ValidatorStakeInfo::default() @@ -3389,6 +3698,7 @@ pub(crate) mod tests { ForkProgress::new( Hash::default(), Some(prev_leader_slot), + DuplicateStats::default(), { if i % 2 == 0 { Some(ValidatorStakeInfo { @@ -3468,6 +3778,7 @@ pub(crate) mod tests { let mut fork_progress = ForkProgress::new( Hash::default(), Some(prev_leader_slot), + DuplicateStats::default(), Some(ValidatorStakeInfo { total_epoch_stake, ..ValidatorStakeInfo::default() @@ -3527,7 +3838,7 @@ pub(crate) mod tests { // should succeed progress_map.insert( parent_slot, - ForkProgress::new(Hash::default(), None, None, 0, 0), + ForkProgress::new(Hash::default(), None, DuplicateStats::default(), None, 0, 0), ); assert!(ReplayStage::check_propagation_for_start_leader( poh_slot, @@ -3543,6 +3854,7 @@ pub(crate) mod tests { ForkProgress::new( Hash::default(), None, + DuplicateStats::default(), Some(ValidatorStakeInfo::default()), 0, 0, @@ -3569,13 +3881,21 @@ pub(crate) mod tests { let previous_leader_slot = parent_slot - 1; progress_map.insert( parent_slot, - ForkProgress::new(Hash::default(), Some(previous_leader_slot), None, 0, 0), + ForkProgress::new( + Hash::default(), + Some(previous_leader_slot), + DuplicateStats::default(), + None, + 0, + 0, + ), ); progress_map.insert( previous_leader_slot, ForkProgress::new( Hash::default(), None, + DuplicateStats::default(), Some(ValidatorStakeInfo::default()), 0, 0, @@ -3636,6 +3956,7 @@ pub(crate) mod tests { ForkProgress::new( Hash::default(), None, + DuplicateStats::default(), Some(ValidatorStakeInfo::default()), 0, 0, @@ -3671,6 +3992,7 @@ pub(crate) mod tests { ForkProgress::new( Hash::default(), None, + DuplicateStats::default(), Some(ValidatorStakeInfo::default()), 0, 0, @@ -3694,6 +4016,7 @@ pub(crate) mod tests { ForkProgress::new( Hash::default(), None, + DuplicateStats::default(), Some(ValidatorStakeInfo::default()), 0, 0, @@ -3880,9 +4203,198 @@ pub(crate) mod tests { assert!(progress.is_propagated(root_bank.slot())); } + #[test] + fn test_unconfirmed_duplicate_slots_and_lockouts() { + /* + Build fork structure: + + slot 0 + | + slot 1 + / \ + slot 2 | + | | + slot 3 | + | | + slot 4 | + slot 5 + | + slot 6 + */ + let forks = tr(0) / (tr(1) / (tr(2) / (tr(3) / (tr(4)))) / (tr(5) / (tr(6)))); + + // Make enough validators for vote switch thrshold later + let mut vote_simulator = VoteSimulator::new(2); + let validator_votes: HashMap> = vec![ + (vote_simulator.node_pubkeys[0], vec![5]), + (vote_simulator.node_pubkeys[1], vec![2]), + ] + .into_iter() + .collect(); + vote_simulator.fill_bank_forks(forks, &validator_votes); + + let (bank_forks, mut progress) = (vote_simulator.bank_forks, vote_simulator.progress); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new( + Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let mut tower = Tower::new_for_tests(8, 0.67); + let mut heaviest_subtree_fork_choice = + HeaviestSubtreeForkChoice::new_from_bank_forks(&bank_forks.read().unwrap()); + + // All forks have same weight so heaviest bank to vote/reset on should be the tip of + // the fork with the lower slot + let (vote_fork, reset_fork) = run_compute_and_select_forks( + &bank_forks, + &mut progress, + &mut tower, + &mut heaviest_subtree_fork_choice, + ); + assert_eq!(vote_fork.unwrap(), 4); + assert_eq!(reset_fork.unwrap(), 4); + + // Record the vote for 4 + tower.record_bank_vote( + &bank_forks.read().unwrap().get(4).unwrap(), + &Pubkey::default(), + ); + + // Mark 4 as duplicate, 3 should be the heaviest slot, but should not be votable + // because of lockout + blockstore.store_duplicate_slot(4, vec![], vec![]).unwrap(); + let ancestors = bank_forks.read().unwrap().ancestors(); + let descendants = bank_forks.read().unwrap().descendants().clone(); + let mut gossip_duplicate_confirmed_slots = BTreeMap::new(); + let bank4_hash = bank_forks.read().unwrap().get(4).unwrap().hash(); + assert_ne!(bank4_hash, Hash::default()); + check_slot_agrees_with_cluster( + 4, + bank_forks.read().unwrap().root(), + Some(bank4_hash), + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::Duplicate, + ); + + let (vote_fork, reset_fork) = run_compute_and_select_forks( + &bank_forks, + &mut progress, + &mut tower, + &mut heaviest_subtree_fork_choice, + ); + assert!(vote_fork.is_none()); + assert_eq!(reset_fork.unwrap(), 3); + + // Now mark 2, an ancestor of 4, as duplicate + blockstore.store_duplicate_slot(2, vec![], vec![]).unwrap(); + let ancestors = bank_forks.read().unwrap().ancestors(); + let descendants = bank_forks.read().unwrap().descendants().clone(); + let bank2_hash = bank_forks.read().unwrap().get(2).unwrap().hash(); + assert_ne!(bank2_hash, Hash::default()); + check_slot_agrees_with_cluster( + 2, + bank_forks.read().unwrap().root(), + Some(bank2_hash), + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::Duplicate, + ); + + let (vote_fork, reset_fork) = run_compute_and_select_forks( + &bank_forks, + &mut progress, + &mut tower, + &mut heaviest_subtree_fork_choice, + ); + + // Should now pick the next heaviest fork that is not a descendant of 2, which is 6. + // However the lockout from vote 4 should still apply, so 6 should not be votable + assert!(vote_fork.is_none()); + assert_eq!(reset_fork.unwrap(), 6); + + // If slot 4 is marked as confirmed, then this confirms slot 2 and 4, and + // then slot 4 is now the heaviest bank again + gossip_duplicate_confirmed_slots.insert(4, bank4_hash); + check_slot_agrees_with_cluster( + 4, + bank_forks.read().unwrap().root(), + Some(bank4_hash), + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::DuplicateConfirmed, + ); + let (vote_fork, reset_fork) = run_compute_and_select_forks( + &bank_forks, + &mut progress, + &mut tower, + &mut heaviest_subtree_fork_choice, + ); + // Should now pick the heaviest fork 4 again, but lockouts apply so fork 4 + // is not votable, which avoids voting for 4 again. + assert!(vote_fork.is_none()); + assert_eq!(reset_fork.unwrap(), 4); + } + + fn run_compute_and_select_forks( + bank_forks: &RwLock, + progress: &mut ProgressMap, + tower: &mut Tower, + heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + ) -> (Option, Option) { + let mut frozen_banks: Vec<_> = bank_forks + .read() + .unwrap() + .frozen_banks() + .values() + .cloned() + .collect(); + let ancestors = &bank_forks.read().unwrap().ancestors(); + let descendants = &bank_forks.read().unwrap().descendants().clone(); + ReplayStage::compute_bank_stats( + &Pubkey::default(), + &bank_forks.read().unwrap().ancestors(), + &mut frozen_banks, + tower, + progress, + &VoteTracker::default(), + &ClusterSlots::default(), + &bank_forks, + heaviest_subtree_fork_choice, + ); + let (heaviest_bank, heaviest_bank_on_same_fork) = heaviest_subtree_fork_choice + .select_forks(&frozen_banks, &tower, &progress, &ancestors, bank_forks); + assert!(heaviest_bank_on_same_fork.is_none()); + let SelectVoteAndResetForkResult { + vote_bank, + reset_bank, + .. + } = ReplayStage::select_vote_and_reset_forks( + &heaviest_bank, + heaviest_bank_on_same_fork.as_ref(), + &ancestors, + &descendants, + progress, + tower, + ); + ( + vote_bank.map(|(b, _)| b.slot()), + reset_bank.map(|b| b.slot()), + ) + } + fn setup_forks() -> (RwLock, ProgressMap) { /* Build fork structure: + slot 0 | slot 1 diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 55eba71c92..327b4ca75e 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -16,7 +16,7 @@ use crate::{ rpc_subscriptions::RpcSubscriptions, window_service::{should_retransmit_and_persist, WindowService}, }; -use crossbeam_channel::Receiver; +use crossbeam_channel::{Receiver, Sender}; use lru::LruCache; use solana_client::rpc_response::SlotUpdate; use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; @@ -605,6 +605,7 @@ impl RetransmitStage { completed_data_sets_sender: CompletedDataSetsSender, max_slots: &Arc, rpc_subscriptions: Option>, + duplicate_slots_sender: Sender, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -666,6 +667,7 @@ impl RetransmitStage { cluster_slots, verified_vote_receiver, completed_data_sets_sender, + duplicate_slots_sender, ); let mut thread_hdls = t_retransmit; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4b463cd364..23fbfd5741 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -5,7 +5,10 @@ use crate::{ banking_stage::BankingStage, broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver}, cluster_info::ClusterInfo, - cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker}, + cluster_info_vote_listener::{ + ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, VerifiedVoteSender, + VoteTracker, + }, fetch_stage::FetchStage, optimistically_confirmed_bank_tracker::BankNotificationSender, poh_recorder::{PohRecorder, WorkingBankEntry}, @@ -62,6 +65,7 @@ impl Tpu { replay_vote_sender: ReplayVoteSender, bank_notification_sender: Option, tpu_coalesce_ms: u64, + cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -95,6 +99,7 @@ impl Tpu { replay_vote_receiver, blockstore.clone(), bank_notification_sender, + cluster_confirmed_slot_sender, ); let banking_stage = BankingStage::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4b033b2032..42b2cb7393 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -6,7 +6,9 @@ use crate::{ broadcast_stage::RetransmitSlotsSender, cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, - cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker}, + cluster_info_vote_listener::{ + GossipDuplicateConfirmedSlotsReceiver, VerifiedVoteReceiver, VoteTracker, + }, cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, consensus::Tower, @@ -120,6 +122,7 @@ impl Tvu { replay_vote_sender: ReplayVoteSender, completed_data_sets_sender: CompletedDataSetsSender, bank_notification_sender: Option, + gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, tvu_config: TvuConfig, max_slots: &Arc, ) -> Self { @@ -159,6 +162,7 @@ impl Tvu { let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); let compaction_interval = tvu_config.rocksdb_compaction_interval; let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter; + let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -179,6 +183,7 @@ impl Tvu { completed_data_sets_sender, max_slots, Some(subscriptions.clone()), + duplicate_slots_sender, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); @@ -262,6 +267,7 @@ impl Tvu { bank_forks.clone(), cluster_info.clone(), ledger_signal_receiver, + duplicate_slots_receiver, poh_recorder.clone(), tower, vote_tracker, @@ -269,6 +275,7 @@ impl Tvu { retransmit_slots_sender, duplicate_slots_reset_receiver, replay_vote_sender, + gossip_confirmed_slots_receiver, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -371,6 +378,7 @@ pub mod tests { let (_verified_vote_sender, verified_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded(); + let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let tower = Tower::new_with_key(&target1_keypair.pubkey()); let tvu = Tvu::new( @@ -411,6 +419,7 @@ pub mod tests { replay_vote_sender, completed_data_sets_sender, None, + gossip_confirmed_slots_receiver, TvuConfig::default(), &Arc::new(MaxSlots::default()), ); diff --git a/core/src/validator.rs b/core/src/validator.rs index f75b5897a8..00da1a87f7 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -660,6 +660,7 @@ impl Validator { let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); + let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded(); let tvu = Tvu::new( vote_account, authorized_voter_keypairs, @@ -710,6 +711,7 @@ impl Validator { replay_vote_sender.clone(), completed_data_sets_sender, bank_notification_sender.clone(), + cluster_confirmed_slot_receiver, TvuConfig { max_ledger_shreds: config.max_ledger_shreds, halt_on_trusted_validators_accounts_hash_mismatch: config @@ -748,6 +750,7 @@ impl Validator { replay_vote_sender, bank_notification_sender, config.tpu_coalesce_ms, + cluster_confirmed_slot_sender, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); diff --git a/core/src/vote_stake_tracker.rs b/core/src/vote_stake_tracker.rs index 0807df2208..feaede7bdc 100644 --- a/core/src/vote_stake_tracker.rs +++ b/core/src/vote_stake_tracker.rs @@ -1,4 +1,3 @@ -use solana_runtime::commitment::VOTE_THRESHOLD_SIZE; use solana_sdk::pubkey::Pubkey; use std::collections::HashSet; @@ -9,29 +8,33 @@ pub struct VoteStakeTracker { } impl VoteStakeTracker { - // Returns tuple (is_confirmed, is_new) where - // `is_confirmed` is true if the stake that has voted has just crosssed the supermajority - // of stake + // Returns tuple (reached_threshold_results, is_new) where + // Each index in `reached_threshold_results` is true if the corresponding threshold in the input + // `thresholds_to_check` was newly reached by adding the stake of the input `vote_pubkey` // `is_new` is true if the vote has not been seen before pub fn add_vote_pubkey( &mut self, vote_pubkey: Pubkey, stake: u64, total_stake: u64, - ) -> (bool, bool) { + thresholds_to_check: &[f64], + ) -> (Vec, bool) { let is_new = !self.voted.contains(&vote_pubkey); if is_new { self.voted.insert(vote_pubkey); - let supermajority_stake = (total_stake as f64 * VOTE_THRESHOLD_SIZE) as u64; let old_stake = self.stake; let new_stake = self.stake + stake; self.stake = new_stake; - ( - old_stake <= supermajority_stake && supermajority_stake < new_stake, - is_new, - ) + let reached_threshold_results: Vec = thresholds_to_check + .iter() + .map(|threshold| { + let threshold_stake = (total_stake as f64 * threshold) as u64; + old_stake <= threshold_stake && threshold_stake < new_stake + }) + .collect(); + (reached_threshold_results, is_new) } else { - (false, is_new) + (vec![false; thresholds_to_check.len()], is_new) } } @@ -47,6 +50,7 @@ impl VoteStakeTracker { #[cfg(test)] mod test { use super::*; + use solana_runtime::commitment::VOTE_THRESHOLD_SIZE; #[test] fn test_add_vote_pubkey() { @@ -54,24 +58,43 @@ mod test { let mut vote_stake_tracker = VoteStakeTracker::default(); for i in 0..10 { let pubkey = solana_sdk::pubkey::new_rand(); - let (is_confirmed, is_new) = - vote_stake_tracker.add_vote_pubkey(pubkey, 1, total_epoch_stake); + let (is_confirmed_thresholds, is_new) = vote_stake_tracker.add_vote_pubkey( + pubkey, + 1, + total_epoch_stake, + &[VOTE_THRESHOLD_SIZE, 0.0], + ); let stake = vote_stake_tracker.stake(); - let (is_confirmed2, is_new2) = - vote_stake_tracker.add_vote_pubkey(pubkey, 1, total_epoch_stake); + let (is_confirmed_thresholds2, is_new2) = vote_stake_tracker.add_vote_pubkey( + pubkey, + 1, + total_epoch_stake, + &[VOTE_THRESHOLD_SIZE, 0.0], + ); let stake2 = vote_stake_tracker.stake(); // Stake should not change from adding same pubkey twice assert_eq!(stake, stake2); - assert!(!is_confirmed2); + assert!(!is_confirmed_thresholds2[0]); + assert!(!is_confirmed_thresholds2[1]); assert!(!is_new2); + assert_eq!(is_confirmed_thresholds.len(), 2); + assert_eq!(is_confirmed_thresholds2.len(), 2); // at i == 6, the voted stake is 70%, which is the first time crossing // the supermajority threshold if i == 6 { - assert!(is_confirmed); + assert!(is_confirmed_thresholds[0]); } else { - assert!(!is_confirmed); + assert!(!is_confirmed_thresholds[0]); + } + + // at i == 6, the voted stake is 10%, which is the first time crossing + // the 0% threshold + if i == 0 { + assert!(is_confirmed_thresholds[1]); + } else { + assert!(!is_confirmed_thresholds[1]); } assert!(is_new); } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 614c2f26c6..1fca87e701 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -26,7 +26,7 @@ use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_perf::packet::Packets; use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; +use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; use solana_streamer::streamer::PacketSender; use std::{ net::{SocketAddr, UdpSocket}, @@ -36,6 +36,9 @@ use std::{ time::{Duration, Instant}, }; +pub type DuplicateSlotSender = CrossbeamSender; +pub type DuplicateSlotReceiver = CrossbeamReceiver; + fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { // Only data shreds have parent information @@ -86,21 +89,25 @@ fn run_check_duplicate( cluster_info: &ClusterInfo, blockstore: &Blockstore, shred_receiver: &CrossbeamReceiver, + duplicate_slot_sender: &DuplicateSlotSender, ) -> Result<()> { let check_duplicate = |shred: Shred| -> Result<()> { - if !blockstore.has_duplicate_shreds_in_slot(shred.slot()) { + let shred_slot = shred.slot(); + if !blockstore.has_duplicate_shreds_in_slot(shred_slot) { if let Some(existing_shred_payload) = blockstore.is_shred_duplicate( - shred.slot(), + shred_slot, shred.index(), &shred.payload, shred.is_data(), ) { cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?; blockstore.store_duplicate_slot( - shred.slot(), + shred_slot, existing_shred_payload, shred.payload, )?; + + duplicate_slot_sender.send(shred_slot)?; } } @@ -319,6 +326,7 @@ impl WindowService { cluster_slots: Arc, verified_vote_receiver: VerifiedVoteReceiver, completed_data_sets_sender: CompletedDataSetsSender, + duplicate_slots_sender: DuplicateSlotSender, ) -> WindowService where F: 'static @@ -346,6 +354,7 @@ impl WindowService { exit.clone(), blockstore.clone(), duplicate_receiver, + duplicate_slots_sender, ); let t_insert = Self::start_window_insert_thread( @@ -381,6 +390,7 @@ impl WindowService { exit: Arc, blockstore: Arc, duplicate_receiver: CrossbeamReceiver, + duplicate_slot_sender: DuplicateSlotSender, ) -> JoinHandle<()> { let handle_error = || { inc_new_counter_error!("solana-check-duplicate-error", 1, 1); @@ -393,8 +403,12 @@ impl WindowService { } let mut noop = || {}; - if let Err(e) = run_check_duplicate(&cluster_info, &blockstore, &duplicate_receiver) - { + if let Err(e) = run_check_duplicate( + &cluster_info, + &blockstore, + &duplicate_receiver, + &duplicate_slot_sender, + ) { if Self::should_exit_on_error(e, &mut noop, &handle_error) { break; } @@ -408,7 +422,7 @@ impl WindowService { blockstore: &Arc, leader_schedule_cache: &Arc, insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, - duplicate_sender: CrossbeamSender, + check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, ) -> JoinHandle<()> { let exit = exit.clone(); @@ -423,7 +437,7 @@ impl WindowService { .name("solana-window-insert".to_string()) .spawn(move || { let handle_duplicate = |shred| { - let _ = duplicate_sender.send(shred); + let _ = check_duplicate_sender.send(shred); }; let mut metrics = BlockstoreInsertionMetrics::default(); let mut last_print = Instant::now(); @@ -538,6 +552,7 @@ impl WindowService { handle_timeout(); false } + Error::CrossbeamSendError => true, _ => { handle_error(); error!("thread {:?} error {:?}", thread::current().name(), e); @@ -566,7 +581,6 @@ mod test { shred::{DataShredHeader, Shredder}, }; use solana_sdk::{ - clock::Slot, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, hash::Hash, signature::{Keypair, Signer}, @@ -680,6 +694,7 @@ mod test { let blockstore_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); let (sender, receiver) = unbounded(); + let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded(); let (shreds, _) = make_many_slot_entries(5, 5, 10); blockstore .insert_shreds(shreds.clone(), None, false) @@ -692,7 +707,17 @@ mod test { let keypair = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair)); - run_check_duplicate(&cluster_info, &blockstore, &receiver).unwrap(); + run_check_duplicate( + &cluster_info, + &blockstore, + &receiver, + &duplicate_slot_sender, + ) + .unwrap(); assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); + assert_eq!( + duplicate_slot_receiver.try_recv().unwrap(), + duplicate_shred_slot + ); } }