From afafa624a3de9964045cda81e7127a975883f6f5 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 10 Jun 2021 22:28:23 -0700 Subject: [PATCH] Account for duplicate before a bank is frozen or replayed (#17866) --- core/src/cluster_slot_state_verifier.rs | 117 +++++++++++++++++++++--- core/src/consensus.rs | 3 +- core/src/progress_map.rs | 1 + core/src/replay_stage.rs | 52 +++++++++-- 4 files changed, 150 insertions(+), 23 deletions(-) diff --git a/core/src/cluster_slot_state_verifier.rs b/core/src/cluster_slot_state_verifier.rs index 2bc47e2a75..5099b6487c 100644 --- a/core/src/cluster_slot_state_verifier.rs +++ b/core/src/cluster_slot_state_verifier.rs @@ -3,8 +3,9 @@ use crate::{ progress_map::ProgressMap, }; use solana_sdk::{clock::Slot, hash::Hash}; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +pub(crate) type DuplicateSlotsTracker = BTreeSet; pub(crate) type GossipDuplicateConfirmedSlots = BTreeMap; type SlotStateHandler = fn(Slot, &Hash, Option<&Hash>, bool, bool) -> Vec; @@ -234,10 +235,12 @@ fn apply_state_changes( } } +#[allow(clippy::too_many_arguments)] pub(crate) fn check_slot_agrees_with_cluster( slot: Slot, root: Slot, frozen_hash: Option, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, ancestors: &HashMap>, descendants: &HashMap>, @@ -258,6 +261,15 @@ pub(crate) fn check_slot_agrees_with_cluster( return; } + // Needs to happen before the frozen_hash.is_none() check below to account for duplicate + // signals arriving before the bank is constructed in replay. + if matches!(slot_state_update, SlotStateUpdate::Duplicate) { + // If this slot has already been processed before, return + if !duplicate_slots_tracker.insert(slot) { + return; + } + } + if frozen_hash.is_none() { // If the bank doesn't even exist in BankForks yet, // then there's nothing to do as replay of the slot @@ -275,18 +287,7 @@ pub(crate) fn check_slot_agrees_with_cluster( &frozen_hash, is_local_replay_duplicate_confirmed, ); - let mut is_slot_duplicate = - progress.is_unconfirmed_duplicate(slot).expect("If the frozen hash exists, then the slot must exist in bank forks and thus in progress map"); - if matches!(slot_state_update, SlotStateUpdate::Duplicate) { - if is_slot_duplicate { - // Already processed duplicate signal for this slot, no need to continue - return; - } else { - // Otherwise, mark the slot as duplicate so the appropriate state changes - // will trigger - is_slot_duplicate = true; - } - } + let is_slot_duplicate = duplicate_slots_tracker.contains(&slot); let is_dead = progress.is_dead(slot).expect("If the frozen hash exists, then the slot must exist in bank forks and thus in progress map"); info!( @@ -687,6 +688,84 @@ mod test { .unwrap()); } + fn run_test_state_duplicate_then_bank_frozen(initial_bank_hash: Option) { + // Common state + let InitialState { + mut heaviest_subtree_fork_choice, + mut progress, + ancestors, + descendants, + bank_forks, + .. + } = setup(); + + // Setup a duplicate slot state transition with the initial bank state of the duplicate slot + // determined by `initial_bank_hash`, which can be: + // 1) A default hash (unfrozen bank), + // 2) None (a slot that hasn't even started replay yet). + let root = 0; + let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); + let gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); + let duplicate_slot = 2; + check_slot_agrees_with_cluster( + duplicate_slot, + root, + initial_bank_hash, + &mut duplicate_slots_tracker, + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::Duplicate, + ); + + // Now freeze the bank + let frozen_duplicate_slot_hash = bank_forks + .read() + .unwrap() + .get(duplicate_slot) + .unwrap() + .hash(); + check_slot_agrees_with_cluster( + duplicate_slot, + root, + Some(frozen_duplicate_slot_hash), + &mut duplicate_slots_tracker, + &gossip_duplicate_confirmed_slots, + &ancestors, + &descendants, + &mut progress, + &mut heaviest_subtree_fork_choice, + SlotStateUpdate::Frozen, + ); + + // Progress map should have the correct updates, fork choice should mark duplicate + // as unvotable + assert!(progress.is_unconfirmed_duplicate(duplicate_slot).unwrap()); + + // The ancestor of the duplicate slot should be the best slot now + let (duplicate_ancestor, duplicate_parent_hash) = { + let r_bank_forks = bank_forks.read().unwrap(); + let parent_bank = r_bank_forks.get(duplicate_slot).unwrap().parent().unwrap(); + (parent_bank.slot(), parent_bank.hash()) + }; + assert_eq!( + heaviest_subtree_fork_choice.best_overall_slot(), + (duplicate_ancestor, duplicate_parent_hash) + ); + } + + #[test] + fn test_state_unfrozen_bank_duplicate_then_bank_frozen() { + run_test_state_duplicate_then_bank_frozen(Some(Hash::default())); + } + + #[test] + fn test_state_unreplayed_bank_duplicate_then_bank_frozen() { + run_test_state_duplicate_then_bank_frozen(None); + } + #[test] fn test_state_ancestor_confirmed_descendant_duplicate() { // Common state @@ -705,6 +784,7 @@ mod test { (3, slot3_hash) ); let root = 0; + let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); // Mark slot 2 as duplicate confirmed @@ -714,6 +794,7 @@ mod test { 2, root, Some(slot2_hash), + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants, @@ -727,11 +808,13 @@ mod test { (3, slot3_hash) ); - // Mark 3 as duplicate, should not remove slot 2 from fork choice + // Mark 3 as duplicate, should not remove the duplicate confirmed slot 2 from + // fork choice check_slot_agrees_with_cluster( 3, root, Some(slot3_hash), + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants, @@ -764,12 +847,15 @@ mod test { (3, slot3_hash) ); let root = 0; + let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); - // Mark 2 as duplicate confirmed + + // Mark 2 as duplicate check_slot_agrees_with_cluster( 2, root, Some(bank_forks.read().unwrap().get(2).unwrap().hash()), + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants, @@ -790,6 +876,7 @@ mod test { 3, root, Some(slot3_hash), + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants, diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 8462ea54a8..1c351987d8 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1358,7 +1358,7 @@ pub mod test { use super::*; use crate::{ cluster_info_vote_listener::VoteTracker, - cluster_slot_state_verifier::GossipDuplicateConfirmedSlots, + cluster_slot_state_verifier::{DuplicateSlotsTracker, GossipDuplicateConfirmedSlots}, cluster_slots::ClusterSlots, fork_choice::SelectVoteAndResetForkResult, heaviest_subtree_fork_choice::{HeaviestSubtreeForkChoice, SlotHashKey}, @@ -1554,6 +1554,7 @@ pub mod test { &AbsRequestSender::default(), None, &mut self.heaviest_subtree_fork_choice, + &mut DuplicateSlotsTracker::default(), &mut GossipDuplicateConfirmedSlots::default(), &mut UnfrozenGossipVerifiedVoteHashes::default(), &mut true, diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index 7b5efb34ba..876150ae16 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -458,6 +458,7 @@ impl ProgressMap { } } + #[cfg(test)] pub fn is_unconfirmed_duplicate(&self, slot: Slot) -> Option { self.get(&slot).map(|p| { p.duplicate_stats diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 136db5dfc1..0a405804bc 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -341,6 +341,7 @@ impl ReplayStage { let mut partition_exists = false; let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut replay_timing = ReplayTiming::default(); + let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let mut unfrozen_gossip_verified_vote_hashes = UnfrozenGossipVerifiedVoteHashes::default(); let mut latest_validator_votes_for_frozen_banks = LatestValidatorVotesForFrozenBanks::default(); @@ -386,6 +387,7 @@ impl ReplayStage { &bank_notification_sender, &rewards_recorder_sender, &subscriptions, + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants, @@ -414,6 +416,7 @@ impl ReplayStage { let mut process_gossip_duplicate_confirmed_slots_time = Measure::start("process_gossip_duplicate_confirmed_slots"); Self::process_gossip_duplicate_confirmed_slots( &gossip_duplicate_confirmed_slots_receiver, + &mut duplicate_slots_tracker, &mut gossip_duplicate_confirmed_slots, &bank_forks, &mut progress, @@ -443,6 +446,7 @@ impl ReplayStage { if !tpu_has_bank { Self::process_duplicate_slots( &duplicate_slots_receiver, + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &bank_forks, &ancestors, @@ -491,6 +495,7 @@ impl ReplayStage { ); Self::mark_slots_confirmed(&confirmed_forks, &bank_forks, &mut progress, + &mut duplicate_slots_tracker, &ancestors, &descendants, &mut heaviest_subtree_fork_choice); } @@ -584,6 +589,7 @@ impl ReplayStage { &block_commitment_cache, &mut heaviest_subtree_fork_choice, &bank_notification_sender, + &mut duplicate_slots_tracker, &mut gossip_duplicate_confirmed_slots, &mut unfrozen_gossip_verified_vote_hashes, &mut voted_signatures, @@ -913,6 +919,7 @@ impl ReplayStage { // for duplicate slot recovery. fn process_gossip_duplicate_confirmed_slots( gossip_duplicate_confirmed_slots_receiver: &GossipDuplicateConfirmedSlotsReceiver, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, bank_forks: &RwLock, progress: &mut ProgressMap, @@ -941,6 +948,7 @@ impl ReplayStage { .unwrap() .get(confirmed_slot) .map(|b| b.hash()), + duplicate_slots_tracker, gossip_duplicate_confirmed_slots, ancestors, descendants, @@ -974,6 +982,7 @@ impl ReplayStage { // Checks for and handle forks with duplicate slots. fn process_duplicate_slots( duplicate_slots_receiver: &DuplicateSlotReceiver, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, bank_forks: &RwLock, ancestors: &HashMap>, @@ -981,24 +990,25 @@ impl ReplayStage { progress: &mut ProgressMap, fork_choice: &mut HeaviestSubtreeForkChoice, ) { - let duplicate_slots: Vec = duplicate_slots_receiver.try_iter().collect(); + let new_duplicate_slots: Vec = duplicate_slots_receiver.try_iter().collect(); let (root_slot, bank_hashes) = { let r_bank_forks = bank_forks.read().unwrap(); - let bank_hashes: Vec> = duplicate_slots + let bank_hashes: Vec> = new_duplicate_slots .iter() - .map(|slot| r_bank_forks.get(*slot).map(|bank| bank.hash())) + .map(|duplicate_slot| r_bank_forks.get(*duplicate_slot).map(|bank| bank.hash())) .collect(); (r_bank_forks.root(), bank_hashes) }; - - for (duplicate_slot, bank_hash) in duplicate_slots.into_iter().zip(bank_hashes.into_iter()) + for (duplicate_slot, bank_hash) in + new_duplicate_slots.into_iter().zip(bank_hashes.into_iter()) { // WindowService should only send the signal once per slot check_slot_agrees_with_cluster( duplicate_slot, root_slot, bank_hash, + duplicate_slots_tracker, gossip_duplicate_confirmed_slots, ancestors, descendants, @@ -1247,6 +1257,7 @@ impl ReplayStage { root: Slot, err: &BlockstoreProcessorError, subscriptions: &Arc, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, ancestors: &HashMap>, descendants: &HashMap>, @@ -1290,6 +1301,7 @@ impl ReplayStage { slot, root, Some(bank.hash()), + duplicate_slots_tracker, gossip_duplicate_confirmed_slots, ancestors, descendants, @@ -1319,6 +1331,7 @@ impl ReplayStage { block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, bank_notification_sender: &Option, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, vote_signatures: &mut Vec, @@ -1367,6 +1380,7 @@ impl ReplayStage { accounts_background_request_sender, highest_confirmed_root, heaviest_subtree_fork_choice, + duplicate_slots_tracker, gossip_duplicate_confirmed_slots, unfrozen_gossip_verified_vote_hashes, has_new_vote_been_rooted, @@ -1660,6 +1674,7 @@ impl ReplayStage { bank_notification_sender: &Option, rewards_recorder_sender: &Option, subscriptions: &Arc, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, ancestors: &HashMap>, descendants: &HashMap>, @@ -1738,6 +1753,7 @@ impl ReplayStage { root_slot, &err, subscriptions, + duplicate_slots_tracker, gossip_duplicate_confirmed_slots, ancestors, descendants, @@ -1774,6 +1790,7 @@ impl ReplayStage { bank.slot(), bank_forks.read().unwrap().root(), Some(bank.hash()), + duplicate_slots_tracker, gossip_duplicate_confirmed_slots, ancestors, descendants, @@ -2310,6 +2327,7 @@ impl ReplayStage { confirmed_forks: &[Slot], bank_forks: &RwLock, progress: &mut ProgressMap, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, ancestors: &HashMap>, descendants: &HashMap>, fork_choice: &mut HeaviestSubtreeForkChoice, @@ -2334,6 +2352,7 @@ impl ReplayStage { *slot, root_slot, bank_hash, + duplicate_slots_tracker, // Don't need to pass the gossip confirmed slots since `slot` // is already marked as confirmed in progress &BTreeMap::new(), @@ -2389,6 +2408,7 @@ impl ReplayStage { accounts_background_request_sender: &AbsRequestSender, highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, has_new_vote_been_rooted: &mut bool, @@ -2414,6 +2434,10 @@ impl ReplayStage { } progress.handle_new_root(&r_bank_forks); heaviest_subtree_fork_choice.set_root((new_root, r_bank_forks.root_bank().hash())); + let mut slots_ge_root = duplicate_slots_tracker.split_off(&new_root); + // duplicate_slots_tracker now only contains entries >= `new_root` + std::mem::swap(duplicate_slots_tracker, &mut slots_ge_root); + let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root); // gossip_confirmed_slots now only contains entries >= `new_root` std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root); @@ -2856,6 +2880,9 @@ mod tests { ForkProgress::new(Hash::default(), None, DuplicateStats::default(), None, 0, 0), ); } + + let mut duplicate_slots_tracker: DuplicateSlotsTracker = + vec![root - 1, root, root + 1].into_iter().collect(); let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = vec![root - 1, root, root + 1] .into_iter() @@ -2875,6 +2902,7 @@ mod tests { &AbsRequestSender::default(), None, &mut heaviest_subtree_fork_choice, + &mut duplicate_slots_tracker, &mut gossip_duplicate_confirmed_slots, &mut unfrozen_gossip_verified_vote_hashes, &mut true, @@ -2884,6 +2912,10 @@ mod tests { assert_eq!(progress.len(), 1); assert!(progress.get(&root).is_some()); // root - 1 is filtered out + assert_eq!( + duplicate_slots_tracker.into_iter().collect::>(), + vec![root, root + 1] + ); assert_eq!( gossip_duplicate_confirmed_slots .keys() @@ -2944,6 +2976,7 @@ mod tests { &AbsRequestSender::default(), Some(confirmed_root), &mut heaviest_subtree_fork_choice, + &mut DuplicateSlotsTracker::default(), &mut GossipDuplicateConfirmedSlots::default(), &mut UnfrozenGossipVerifiedVoteHashes::default(), &mut true, @@ -3220,7 +3253,8 @@ mod tests { 0, err, &subscriptions, - &BTreeMap::new(), + &mut DuplicateSlotsTracker::default(), + &GossipDuplicateConfirmedSlots::default(), &HashMap::new(), &HashMap::new(), &mut progress, @@ -4592,13 +4626,15 @@ mod tests { blockstore.store_duplicate_slot(4, vec![], vec![]).unwrap(); let ancestors = bank_forks.read().unwrap().ancestors(); let descendants = bank_forks.read().unwrap().descendants().clone(); - let mut gossip_duplicate_confirmed_slots = BTreeMap::new(); + let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); + let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let bank4_hash = bank_forks.read().unwrap().get(4).unwrap().hash(); assert_ne!(bank4_hash, Hash::default()); check_slot_agrees_with_cluster( 4, bank_forks.read().unwrap().root(), Some(bank4_hash), + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants, @@ -4627,6 +4663,7 @@ mod tests { 2, bank_forks.read().unwrap().root(), Some(bank2_hash), + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants, @@ -4655,6 +4692,7 @@ mod tests { 4, bank_forks.read().unwrap().root(), Some(bank4_hash), + &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &ancestors, &descendants,