From fb97e93fe3309b677bd94b4bbb256e329bd722d9 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 10 Jan 2024 15:10:30 -0500 Subject: [PATCH] fix duplicate confirmed rollup detection for descendants (#34014) * fix duplicate confirmed rollup detection for descendants * pr feedback: optimistic rename -> guard new enum --- core/src/consensus.rs | 37 +++++++++- core/src/replay_stage.rs | 144 ++++++++++++++++++++++++++++++--------- 2 files changed, 146 insertions(+), 35 deletions(-) diff --git a/core/src/consensus.rs b/core/src/consensus.rs index f23325f9b..917d373b5 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,3 +1,5 @@ +use crate::replay_stage::DUPLICATE_THRESHOLD; + pub mod fork_choice; pub mod heaviest_subtree_fork_choice; pub(crate) mod latest_validator_votes_for_frozen_banks; @@ -444,7 +446,7 @@ impl Tower { } } - pub fn is_slot_confirmed( + pub(crate) fn is_slot_confirmed( &self, slot: Slot, voted_stakes: &VotedStakes, @@ -456,6 +458,18 @@ impl Tower { .unwrap_or(false) } + pub(crate) fn is_slot_duplicate_confirmed( + &self, + slot: Slot, + voted_stakes: &VotedStakes, + total_stake: Stake, + ) -> bool { + voted_stakes + .get(&slot) + .map(|stake| (*stake as f64 / total_stake as f64) > DUPLICATE_THRESHOLD) + .unwrap_or(false) + } + pub fn tower_slots(&self) -> Vec { self.vote_state.tower() } @@ -2378,6 +2392,27 @@ pub mod test { assert!(tower.is_slot_confirmed(0, &stakes, 2)); } + #[test] + fn test_is_slot_duplicate_confirmed_not_enough_stake_failure() { + let tower = Tower::new_for_tests(1, 0.67); + let stakes = vec![(0, 52)].into_iter().collect(); + assert!(!tower.is_slot_duplicate_confirmed(0, &stakes, 100)); + } + + #[test] + fn test_is_slot_duplicate_confirmed_unknown_slot() { + let tower = Tower::new_for_tests(1, 0.67); + let stakes = HashMap::new(); + assert!(!tower.is_slot_duplicate_confirmed(0, &stakes, 100)); + } + + #[test] + fn test_is_slot_duplicate_confirmed_pass() { + let tower = Tower::new_for_tests(1, 0.67); + let stakes = vec![(0, 53)].into_iter().collect(); + assert!(tower.is_slot_duplicate_confirmed(0, &stakes, 100)); + } + #[test] fn test_is_locked_out_empty() { let tower = Tower::new_for_tests(0, 0.67); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 92577a220..d03dfd082 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -131,6 +131,37 @@ pub enum HeaviestForkFailures { ), } +#[derive(PartialEq, Eq, Debug)] +enum ConfirmationType { + SupermajorityVoted, + DuplicateConfirmed, +} + +#[derive(PartialEq, Eq, Debug)] +struct ConfirmedSlot { + slot: Slot, + frozen_hash: Hash, + confirmation_type: ConfirmationType, +} + +impl ConfirmedSlot { + fn new_supermajority_voted(slot: Slot, frozen_hash: Hash) -> Self { + Self { + slot, + frozen_hash, + confirmation_type: ConfirmationType::SupermajorityVoted, + } + } + + fn new_duplicate_confirmed_slot(slot: Slot, frozen_hash: Hash) -> Self { + Self { + slot, + frozen_hash, + confirmation_type: ConfirmationType::DuplicateConfirmed, + } + } +} + // Implement a destructor for the ReplayStage thread to signal it exited // even on panics struct Finalizer { @@ -758,7 +789,7 @@ impl ReplayStage { let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time"); for slot in newly_computed_slot_stats { let fork_stats = progress.get_fork_stats(slot).unwrap(); - let confirmed_forks = Self::confirm_forks( + let confirmed_slots = Self::confirm_forks( &tower, &fork_stats.voted_stakes, fork_stats.total_stake, @@ -767,7 +798,7 @@ impl ReplayStage { ); Self::mark_slots_confirmed( - &confirmed_forks, + &confirmed_slots, &blockstore, &bank_forks, &mut progress, @@ -777,6 +808,7 @@ impl ReplayStage { &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, &mut purge_repair_slot_counter, + &mut duplicate_confirmed_slots, ); } compute_slot_stats_time.stop(); @@ -3834,7 +3866,7 @@ impl ReplayStage { #[allow(clippy::too_many_arguments)] fn mark_slots_confirmed( - confirmed_forks: &[(Slot, Hash)], + confirmed_slots: &[ConfirmedSlot], blockstore: &Blockstore, bank_forks: &RwLock, progress: &mut ProgressMap, @@ -3844,37 +3876,62 @@ impl ReplayStage { duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, purge_repair_slot_counter: &mut PurgeRepairSlotCounter, + duplicate_confirmed_slots: &mut DuplicateConfirmedSlots, ) { let root_slot = bank_forks.read().unwrap().root(); - for (slot, frozen_hash) in confirmed_forks.iter() { - // This case should be guaranteed as false by confirm_forks() - if let Some(false) = progress.is_supermajority_confirmed(*slot) { - // Because supermajority confirmation will iterate through and update the - // subtree in fork choice, only incur this cost if the slot wasn't already - // confirmed - progress.set_supermajority_confirmed_slot(*slot); - // If the slot was confirmed, then it must be frozen. Otherwise, we couldn't - // have replayed any of its descendants and figured out it was confirmed. - assert!(*frozen_hash != Hash::default()); - - let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state( - *frozen_hash, - || false, - || Some(*frozen_hash), - ); - check_slot_agrees_with_cluster( - *slot, - root_slot, - blockstore, - duplicate_slots_tracker, - epoch_slots_frozen_slots, - fork_choice, - duplicate_slots_to_repair, - ancestor_hashes_replay_update_sender, - purge_repair_slot_counter, - SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), - ); + for ConfirmedSlot { + slot, + frozen_hash, + confirmation_type, + } in confirmed_slots.iter() + { + if *confirmation_type == ConfirmationType::SupermajorityVoted { + // This case should be guaranteed as false by confirm_forks() + if let Some(false) = progress.is_supermajority_confirmed(*slot) { + // Because supermajority confirmation will iterate through and update the + // subtree in fork choice, only incur this cost if the slot wasn't already + // confirmed + progress.set_supermajority_confirmed_slot(*slot); + // If the slot was confirmed, then it must be frozen. Otherwise, we couldn't + // have replayed any of its descendants and figured out it was confirmed. + assert!(*frozen_hash != Hash::default()); + } } + + if *slot <= root_slot { + continue; + } + + match confirmation_type { + ConfirmationType::SupermajorityVoted => (), + ConfirmationType::DuplicateConfirmed => (), + #[allow(unreachable_patterns)] + _ => panic!("programmer error"), + } + + if let Some(prev_hash) = duplicate_confirmed_slots.insert(*slot, *frozen_hash) { + assert_eq!(prev_hash, *frozen_hash); + // Already processed this signal + return; + } + + let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state( + *frozen_hash, + || false, + || Some(*frozen_hash), + ); + check_slot_agrees_with_cluster( + *slot, + root_slot, + blockstore, + duplicate_slots_tracker, + epoch_slots_frozen_slots, + fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), + ); } } @@ -3884,7 +3941,7 @@ impl ReplayStage { total_stake: Stake, progress: &ProgressMap, bank_forks: &RwLock, - ) -> Vec<(Slot, Hash)> { + ) -> Vec { let mut confirmed_forks = vec![]; for (slot, prog) in progress.iter() { if !prog.fork_stats.is_supermajority_confirmed { @@ -3904,7 +3961,23 @@ impl ReplayStage { if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) { info!("validator fork confirmed {} {}ms", *slot, duration); datapoint_info!("validator-confirmation", ("duration_ms", duration, i64)); - confirmed_forks.push((*slot, bank.hash())); + confirmed_forks + .push(ConfirmedSlot::new_supermajority_voted(*slot, bank.hash())); + } else if bank.is_frozen() + && tower.is_slot_duplicate_confirmed(*slot, voted_stakes, total_stake) + { + info!( + "validator fork duplicate confirmed {} {}ms", + *slot, duration + ); + datapoint_info!( + "validator-duplicate-confirmation", + ("duration_ms", duration, i64) + ); + confirmed_forks.push(ConfirmedSlot::new_duplicate_confirmed_slot( + *slot, + bank.hash(), + )); } else { debug!( "validator fork not confirmed {} {}ms {:?}", @@ -5213,7 +5286,10 @@ pub(crate) mod tests { &bank_forks, ); // No new stats should have been computed - assert_eq!(confirmed_forks, vec![(0, bank0.hash())]); + assert_eq!( + confirmed_forks, + vec![ConfirmedSlot::new_supermajority_voted(0, bank0.hash())] + ); } let ancestors = bank_forks.read().unwrap().ancestors();