fix duplicate confirmed rollup detection for descendants (#34014)

* fix duplicate confirmed rollup detection for descendants

* pr feedback: optimistic rename -> guard new enum
This commit is contained in:
Ashwin Sekar 2024-01-10 15:10:30 -05:00 committed by GitHub
parent fb35552f5b
commit fb97e93fe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 146 additions and 35 deletions

View File

@ -1,3 +1,5 @@
use crate::replay_stage::DUPLICATE_THRESHOLD;
pub mod fork_choice; pub mod fork_choice;
pub mod heaviest_subtree_fork_choice; pub mod heaviest_subtree_fork_choice;
pub(crate) mod latest_validator_votes_for_frozen_banks; pub(crate) mod latest_validator_votes_for_frozen_banks;
@ -444,7 +446,7 @@ impl Tower {
} }
} }
pub fn is_slot_confirmed( pub(crate) fn is_slot_confirmed(
&self, &self,
slot: Slot, slot: Slot,
voted_stakes: &VotedStakes, voted_stakes: &VotedStakes,
@ -456,6 +458,18 @@ impl Tower {
.unwrap_or(false) .unwrap_or(false)
} }
pub(crate) fn is_slot_duplicate_confirmed(
&self,
slot: Slot,
voted_stakes: &VotedStakes,
total_stake: Stake,
) -> bool {
voted_stakes
.get(&slot)
.map(|stake| (*stake as f64 / total_stake as f64) > DUPLICATE_THRESHOLD)
.unwrap_or(false)
}
pub fn tower_slots(&self) -> Vec<Slot> { pub fn tower_slots(&self) -> Vec<Slot> {
self.vote_state.tower() self.vote_state.tower()
} }
@ -2378,6 +2392,27 @@ pub mod test {
assert!(tower.is_slot_confirmed(0, &stakes, 2)); assert!(tower.is_slot_confirmed(0, &stakes, 2));
} }
#[test]
fn test_is_slot_duplicate_confirmed_not_enough_stake_failure() {
let tower = Tower::new_for_tests(1, 0.67);
let stakes = vec![(0, 52)].into_iter().collect();
assert!(!tower.is_slot_duplicate_confirmed(0, &stakes, 100));
}
#[test]
fn test_is_slot_duplicate_confirmed_unknown_slot() {
let tower = Tower::new_for_tests(1, 0.67);
let stakes = HashMap::new();
assert!(!tower.is_slot_duplicate_confirmed(0, &stakes, 100));
}
#[test]
fn test_is_slot_duplicate_confirmed_pass() {
let tower = Tower::new_for_tests(1, 0.67);
let stakes = vec![(0, 53)].into_iter().collect();
assert!(tower.is_slot_duplicate_confirmed(0, &stakes, 100));
}
#[test] #[test]
fn test_is_locked_out_empty() { fn test_is_locked_out_empty() {
let tower = Tower::new_for_tests(0, 0.67); let tower = Tower::new_for_tests(0, 0.67);

View File

@ -131,6 +131,37 @@ pub enum HeaviestForkFailures {
), ),
} }
#[derive(PartialEq, Eq, Debug)]
enum ConfirmationType {
SupermajorityVoted,
DuplicateConfirmed,
}
#[derive(PartialEq, Eq, Debug)]
struct ConfirmedSlot {
slot: Slot,
frozen_hash: Hash,
confirmation_type: ConfirmationType,
}
impl ConfirmedSlot {
fn new_supermajority_voted(slot: Slot, frozen_hash: Hash) -> Self {
Self {
slot,
frozen_hash,
confirmation_type: ConfirmationType::SupermajorityVoted,
}
}
fn new_duplicate_confirmed_slot(slot: Slot, frozen_hash: Hash) -> Self {
Self {
slot,
frozen_hash,
confirmation_type: ConfirmationType::DuplicateConfirmed,
}
}
}
// Implement a destructor for the ReplayStage thread to signal it exited // Implement a destructor for the ReplayStage thread to signal it exited
// even on panics // even on panics
struct Finalizer { struct Finalizer {
@ -758,7 +789,7 @@ impl ReplayStage {
let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time"); let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time");
for slot in newly_computed_slot_stats { for slot in newly_computed_slot_stats {
let fork_stats = progress.get_fork_stats(slot).unwrap(); let fork_stats = progress.get_fork_stats(slot).unwrap();
let confirmed_forks = Self::confirm_forks( let confirmed_slots = Self::confirm_forks(
&tower, &tower,
&fork_stats.voted_stakes, &fork_stats.voted_stakes,
fork_stats.total_stake, fork_stats.total_stake,
@ -767,7 +798,7 @@ impl ReplayStage {
); );
Self::mark_slots_confirmed( Self::mark_slots_confirmed(
&confirmed_forks, &confirmed_slots,
&blockstore, &blockstore,
&bank_forks, &bank_forks,
&mut progress, &mut progress,
@ -777,6 +808,7 @@ impl ReplayStage {
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter, &mut purge_repair_slot_counter,
&mut duplicate_confirmed_slots,
); );
} }
compute_slot_stats_time.stop(); compute_slot_stats_time.stop();
@ -3834,7 +3866,7 @@ impl ReplayStage {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn mark_slots_confirmed( fn mark_slots_confirmed(
confirmed_forks: &[(Slot, Hash)], confirmed_slots: &[ConfirmedSlot],
blockstore: &Blockstore, blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap, progress: &mut ProgressMap,
@ -3844,9 +3876,16 @@ impl ReplayStage {
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter, purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
duplicate_confirmed_slots: &mut DuplicateConfirmedSlots,
) { ) {
let root_slot = bank_forks.read().unwrap().root(); let root_slot = bank_forks.read().unwrap().root();
for (slot, frozen_hash) in confirmed_forks.iter() { for ConfirmedSlot {
slot,
frozen_hash,
confirmation_type,
} in confirmed_slots.iter()
{
if *confirmation_type == ConfirmationType::SupermajorityVoted {
// This case should be guaranteed as false by confirm_forks() // This case should be guaranteed as false by confirm_forks()
if let Some(false) = progress.is_supermajority_confirmed(*slot) { if let Some(false) = progress.is_supermajority_confirmed(*slot) {
// Because supermajority confirmation will iterate through and update the // Because supermajority confirmation will iterate through and update the
@ -3856,6 +3895,25 @@ impl ReplayStage {
// If the slot was confirmed, then it must be frozen. Otherwise, we couldn't // If the slot was confirmed, then it must be frozen. Otherwise, we couldn't
// have replayed any of its descendants and figured out it was confirmed. // have replayed any of its descendants and figured out it was confirmed.
assert!(*frozen_hash != Hash::default()); assert!(*frozen_hash != Hash::default());
}
}
if *slot <= root_slot {
continue;
}
match confirmation_type {
ConfirmationType::SupermajorityVoted => (),
ConfirmationType::DuplicateConfirmed => (),
#[allow(unreachable_patterns)]
_ => panic!("programmer error"),
}
if let Some(prev_hash) = duplicate_confirmed_slots.insert(*slot, *frozen_hash) {
assert_eq!(prev_hash, *frozen_hash);
// Already processed this signal
return;
}
let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state( let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
*frozen_hash, *frozen_hash,
@ -3876,7 +3934,6 @@ impl ReplayStage {
); );
} }
} }
}
fn confirm_forks( fn confirm_forks(
tower: &Tower, tower: &Tower,
@ -3884,7 +3941,7 @@ impl ReplayStage {
total_stake: Stake, total_stake: Stake,
progress: &ProgressMap, progress: &ProgressMap,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
) -> Vec<(Slot, Hash)> { ) -> Vec<ConfirmedSlot> {
let mut confirmed_forks = vec![]; let mut confirmed_forks = vec![];
for (slot, prog) in progress.iter() { for (slot, prog) in progress.iter() {
if !prog.fork_stats.is_supermajority_confirmed { if !prog.fork_stats.is_supermajority_confirmed {
@ -3904,7 +3961,23 @@ impl ReplayStage {
if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) { if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) {
info!("validator fork confirmed {} {}ms", *slot, duration); info!("validator fork confirmed {} {}ms", *slot, duration);
datapoint_info!("validator-confirmation", ("duration_ms", duration, i64)); datapoint_info!("validator-confirmation", ("duration_ms", duration, i64));
confirmed_forks.push((*slot, bank.hash())); confirmed_forks
.push(ConfirmedSlot::new_supermajority_voted(*slot, bank.hash()));
} else if bank.is_frozen()
&& tower.is_slot_duplicate_confirmed(*slot, voted_stakes, total_stake)
{
info!(
"validator fork duplicate confirmed {} {}ms",
*slot, duration
);
datapoint_info!(
"validator-duplicate-confirmation",
("duration_ms", duration, i64)
);
confirmed_forks.push(ConfirmedSlot::new_duplicate_confirmed_slot(
*slot,
bank.hash(),
));
} else { } else {
debug!( debug!(
"validator fork not confirmed {} {}ms {:?}", "validator fork not confirmed {} {}ms {:?}",
@ -5213,7 +5286,10 @@ pub(crate) mod tests {
&bank_forks, &bank_forks,
); );
// No new stats should have been computed // No new stats should have been computed
assert_eq!(confirmed_forks, vec![(0, bank0.hash())]); assert_eq!(
confirmed_forks,
vec![ConfirmedSlot::new_supermajority_voted(0, bank0.hash())]
);
} }
let ancestors = bank_forks.read().unwrap().ancestors(); let ancestors = bank_forks.read().unwrap().ancestors();