Account for duplicate before a bank is frozen or replayed (#17866)

This commit is contained in:
carllin 2021-06-10 22:28:23 -07:00 committed by GitHub
parent 191519188d
commit afafa624a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 23 deletions

View File

@ -3,8 +3,9 @@ use crate::{
progress_map::ProgressMap,
};
use solana_sdk::{clock::Slot, hash::Hash};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
pub(crate) type DuplicateSlotsTracker = BTreeSet<Slot>;
pub(crate) type GossipDuplicateConfirmedSlots = BTreeMap<Slot, Hash>;
type SlotStateHandler = fn(Slot, &Hash, Option<&Hash>, bool, bool) -> Vec<ResultingStateChange>;
@ -234,10 +235,12 @@ fn apply_state_changes(
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn check_slot_agrees_with_cluster(
slot: Slot,
root: Slot,
frozen_hash: Option<Hash>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
ancestors: &HashMap<Slot, HashSet<Slot>>,
descendants: &HashMap<Slot, HashSet<Slot>>,
@ -258,6 +261,15 @@ pub(crate) fn check_slot_agrees_with_cluster(
return;
}
// Needs to happen before the frozen_hash.is_none() check below to account for duplicate
// signals arriving before the bank is constructed in replay.
if matches!(slot_state_update, SlotStateUpdate::Duplicate) {
// If this slot has already been processed before, return
if !duplicate_slots_tracker.insert(slot) {
return;
}
}
if frozen_hash.is_none() {
// If the bank doesn't even exist in BankForks yet,
// then there's nothing to do as replay of the slot
@ -275,18 +287,7 @@ pub(crate) fn check_slot_agrees_with_cluster(
&frozen_hash,
is_local_replay_duplicate_confirmed,
);
let mut is_slot_duplicate =
progress.is_unconfirmed_duplicate(slot).expect("If the frozen hash exists, then the slot must exist in bank forks and thus in progress map");
if matches!(slot_state_update, SlotStateUpdate::Duplicate) {
if is_slot_duplicate {
// Already processed duplicate signal for this slot, no need to continue
return;
} else {
// Otherwise, mark the slot as duplicate so the appropriate state changes
// will trigger
is_slot_duplicate = true;
}
}
let is_slot_duplicate = duplicate_slots_tracker.contains(&slot);
let is_dead = progress.is_dead(slot).expect("If the frozen hash exists, then the slot must exist in bank forks and thus in progress map");
info!(
@ -687,6 +688,84 @@ mod test {
.unwrap());
}
fn run_test_state_duplicate_then_bank_frozen(initial_bank_hash: Option<Hash>) {
// Common state
let InitialState {
mut heaviest_subtree_fork_choice,
mut progress,
ancestors,
descendants,
bank_forks,
..
} = setup();
// Setup a duplicate slot state transition with the initial bank state of the duplicate slot
// determined by `initial_bank_hash`, which can be:
// 1) A default hash (unfrozen bank),
// 2) None (a slot that hasn't even started replay yet).
let root = 0;
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let duplicate_slot = 2;
check_slot_agrees_with_cluster(
duplicate_slot,
root,
initial_bank_hash,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
&mut progress,
&mut heaviest_subtree_fork_choice,
SlotStateUpdate::Duplicate,
);
// Now freeze the bank
let frozen_duplicate_slot_hash = bank_forks
.read()
.unwrap()
.get(duplicate_slot)
.unwrap()
.hash();
check_slot_agrees_with_cluster(
duplicate_slot,
root,
Some(frozen_duplicate_slot_hash),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
&mut progress,
&mut heaviest_subtree_fork_choice,
SlotStateUpdate::Frozen,
);
// Progress map should have the correct updates, fork choice should mark duplicate
// as unvotable
assert!(progress.is_unconfirmed_duplicate(duplicate_slot).unwrap());
// The ancestor of the duplicate slot should be the best slot now
let (duplicate_ancestor, duplicate_parent_hash) = {
let r_bank_forks = bank_forks.read().unwrap();
let parent_bank = r_bank_forks.get(duplicate_slot).unwrap().parent().unwrap();
(parent_bank.slot(), parent_bank.hash())
};
assert_eq!(
heaviest_subtree_fork_choice.best_overall_slot(),
(duplicate_ancestor, duplicate_parent_hash)
);
}
#[test]
fn test_state_unfrozen_bank_duplicate_then_bank_frozen() {
run_test_state_duplicate_then_bank_frozen(Some(Hash::default()));
}
#[test]
fn test_state_unreplayed_bank_duplicate_then_bank_frozen() {
run_test_state_duplicate_then_bank_frozen(None);
}
#[test]
fn test_state_ancestor_confirmed_descendant_duplicate() {
// Common state
@ -705,6 +784,7 @@ mod test {
(3, slot3_hash)
);
let root = 0;
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
// Mark slot 2 as duplicate confirmed
@ -714,6 +794,7 @@ mod test {
2,
root,
Some(slot2_hash),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
@ -727,11 +808,13 @@ mod test {
(3, slot3_hash)
);
// Mark 3 as duplicate, should not remove slot 2 from fork choice
// Mark 3 as duplicate, should not remove the duplicate confirmed slot 2 from
// fork choice
check_slot_agrees_with_cluster(
3,
root,
Some(slot3_hash),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
@ -764,12 +847,15 @@ mod test {
(3, slot3_hash)
);
let root = 0;
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
// Mark 2 as duplicate confirmed
// Mark 2 as duplicate
check_slot_agrees_with_cluster(
2,
root,
Some(bank_forks.read().unwrap().get(2).unwrap().hash()),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
@ -790,6 +876,7 @@ mod test {
3,
root,
Some(slot3_hash),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,

View File

@ -1358,7 +1358,7 @@ pub mod test {
use super::*;
use crate::{
cluster_info_vote_listener::VoteTracker,
cluster_slot_state_verifier::GossipDuplicateConfirmedSlots,
cluster_slot_state_verifier::{DuplicateSlotsTracker, GossipDuplicateConfirmedSlots},
cluster_slots::ClusterSlots,
fork_choice::SelectVoteAndResetForkResult,
heaviest_subtree_fork_choice::{HeaviestSubtreeForkChoice, SlotHashKey},
@ -1554,6 +1554,7 @@ pub mod test {
&AbsRequestSender::default(),
None,
&mut self.heaviest_subtree_fork_choice,
&mut DuplicateSlotsTracker::default(),
&mut GossipDuplicateConfirmedSlots::default(),
&mut UnfrozenGossipVerifiedVoteHashes::default(),
&mut true,

View File

@ -458,6 +458,7 @@ impl ProgressMap {
}
}
#[cfg(test)]
pub fn is_unconfirmed_duplicate(&self, slot: Slot) -> Option<bool> {
self.get(&slot).map(|p| {
p.duplicate_stats

View File

@ -341,6 +341,7 @@ impl ReplayStage {
let mut partition_exists = false;
let mut skipped_slots_info = SkippedSlotsInfo::default();
let mut replay_timing = ReplayTiming::default();
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut unfrozen_gossip_verified_vote_hashes = UnfrozenGossipVerifiedVoteHashes::default();
let mut latest_validator_votes_for_frozen_banks = LatestValidatorVotesForFrozenBanks::default();
@ -386,6 +387,7 @@ impl ReplayStage {
&bank_notification_sender,
&rewards_recorder_sender,
&subscriptions,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
@ -414,6 +416,7 @@ impl ReplayStage {
let mut process_gossip_duplicate_confirmed_slots_time = Measure::start("process_gossip_duplicate_confirmed_slots");
Self::process_gossip_duplicate_confirmed_slots(
&gossip_duplicate_confirmed_slots_receiver,
&mut duplicate_slots_tracker,
&mut gossip_duplicate_confirmed_slots,
&bank_forks,
&mut progress,
@ -443,6 +446,7 @@ impl ReplayStage {
if !tpu_has_bank {
Self::process_duplicate_slots(
&duplicate_slots_receiver,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&bank_forks,
&ancestors,
@ -491,6 +495,7 @@ impl ReplayStage {
);
Self::mark_slots_confirmed(&confirmed_forks, &bank_forks, &mut progress,
&mut duplicate_slots_tracker,
&ancestors, &descendants, &mut
heaviest_subtree_fork_choice);
}
@ -584,6 +589,7 @@ impl ReplayStage {
&block_commitment_cache,
&mut heaviest_subtree_fork_choice,
&bank_notification_sender,
&mut duplicate_slots_tracker,
&mut gossip_duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut voted_signatures,
@ -913,6 +919,7 @@ impl ReplayStage {
// for duplicate slot recovery.
fn process_gossip_duplicate_confirmed_slots(
gossip_duplicate_confirmed_slots_receiver: &GossipDuplicateConfirmedSlotsReceiver,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
@ -941,6 +948,7 @@ impl ReplayStage {
.unwrap()
.get(confirmed_slot)
.map(|b| b.hash()),
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
ancestors,
descendants,
@ -974,6 +982,7 @@ impl ReplayStage {
// Checks for and handle forks with duplicate slots.
fn process_duplicate_slots(
duplicate_slots_receiver: &DuplicateSlotReceiver,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
bank_forks: &RwLock<BankForks>,
ancestors: &HashMap<Slot, HashSet<Slot>>,
@ -981,24 +990,25 @@ impl ReplayStage {
progress: &mut ProgressMap,
fork_choice: &mut HeaviestSubtreeForkChoice,
) {
let duplicate_slots: Vec<Slot> = duplicate_slots_receiver.try_iter().collect();
let new_duplicate_slots: Vec<Slot> = duplicate_slots_receiver.try_iter().collect();
let (root_slot, bank_hashes) = {
let r_bank_forks = bank_forks.read().unwrap();
let bank_hashes: Vec<Option<Hash>> = duplicate_slots
let bank_hashes: Vec<Option<Hash>> = new_duplicate_slots
.iter()
.map(|slot| r_bank_forks.get(*slot).map(|bank| bank.hash()))
.map(|duplicate_slot| r_bank_forks.get(*duplicate_slot).map(|bank| bank.hash()))
.collect();
(r_bank_forks.root(), bank_hashes)
};
for (duplicate_slot, bank_hash) in duplicate_slots.into_iter().zip(bank_hashes.into_iter())
for (duplicate_slot, bank_hash) in
new_duplicate_slots.into_iter().zip(bank_hashes.into_iter())
{
// WindowService should only send the signal once per slot
check_slot_agrees_with_cluster(
duplicate_slot,
root_slot,
bank_hash,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
ancestors,
descendants,
@ -1247,6 +1257,7 @@ impl ReplayStage {
root: Slot,
err: &BlockstoreProcessorError,
subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
ancestors: &HashMap<Slot, HashSet<Slot>>,
descendants: &HashMap<Slot, HashSet<Slot>>,
@ -1290,6 +1301,7 @@ impl ReplayStage {
slot,
root,
Some(bank.hash()),
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
ancestors,
descendants,
@ -1319,6 +1331,7 @@ impl ReplayStage {
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
vote_signatures: &mut Vec<Signature>,
@ -1367,6 +1380,7 @@ impl ReplayStage {
accounts_background_request_sender,
highest_confirmed_root,
heaviest_subtree_fork_choice,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
unfrozen_gossip_verified_vote_hashes,
has_new_vote_been_rooted,
@ -1660,6 +1674,7 @@ impl ReplayStage {
bank_notification_sender: &Option<BankNotificationSender>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
ancestors: &HashMap<Slot, HashSet<Slot>>,
descendants: &HashMap<Slot, HashSet<Slot>>,
@ -1738,6 +1753,7 @@ impl ReplayStage {
root_slot,
&err,
subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
ancestors,
descendants,
@ -1774,6 +1790,7 @@ impl ReplayStage {
bank.slot(),
bank_forks.read().unwrap().root(),
Some(bank.hash()),
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
ancestors,
descendants,
@ -2310,6 +2327,7 @@ impl ReplayStage {
confirmed_forks: &[Slot],
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
ancestors: &HashMap<Slot, HashSet<Slot>>,
descendants: &HashMap<Slot, HashSet<Slot>>,
fork_choice: &mut HeaviestSubtreeForkChoice,
@ -2334,6 +2352,7 @@ impl ReplayStage {
*slot,
root_slot,
bank_hash,
duplicate_slots_tracker,
// Don't need to pass the gossip confirmed slots since `slot`
// is already marked as confirmed in progress
&BTreeMap::new(),
@ -2389,6 +2408,7 @@ impl ReplayStage {
accounts_background_request_sender: &AbsRequestSender,
highest_confirmed_root: Option<Slot>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
has_new_vote_been_rooted: &mut bool,
@ -2414,6 +2434,10 @@ impl ReplayStage {
}
progress.handle_new_root(&r_bank_forks);
heaviest_subtree_fork_choice.set_root((new_root, r_bank_forks.root_bank().hash()));
let mut slots_ge_root = duplicate_slots_tracker.split_off(&new_root);
// duplicate_slots_tracker now only contains entries >= `new_root`
std::mem::swap(duplicate_slots_tracker, &mut slots_ge_root);
let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root);
// gossip_confirmed_slots now only contains entries >= `new_root`
std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root);
@ -2856,6 +2880,9 @@ mod tests {
ForkProgress::new(Hash::default(), None, DuplicateStats::default(), None, 0, 0),
);
}
let mut duplicate_slots_tracker: DuplicateSlotsTracker =
vec![root - 1, root, root + 1].into_iter().collect();
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots =
vec![root - 1, root, root + 1]
.into_iter()
@ -2875,6 +2902,7 @@ mod tests {
&AbsRequestSender::default(),
None,
&mut heaviest_subtree_fork_choice,
&mut duplicate_slots_tracker,
&mut gossip_duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut true,
@ -2884,6 +2912,10 @@ mod tests {
assert_eq!(progress.len(), 1);
assert!(progress.get(&root).is_some());
// root - 1 is filtered out
assert_eq!(
duplicate_slots_tracker.into_iter().collect::<Vec<Slot>>(),
vec![root, root + 1]
);
assert_eq!(
gossip_duplicate_confirmed_slots
.keys()
@ -2944,6 +2976,7 @@ mod tests {
&AbsRequestSender::default(),
Some(confirmed_root),
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsTracker::default(),
&mut GossipDuplicateConfirmedSlots::default(),
&mut UnfrozenGossipVerifiedVoteHashes::default(),
&mut true,
@ -3220,7 +3253,8 @@ mod tests {
0,
err,
&subscriptions,
&BTreeMap::new(),
&mut DuplicateSlotsTracker::default(),
&GossipDuplicateConfirmedSlots::default(),
&HashMap::new(),
&HashMap::new(),
&mut progress,
@ -4592,13 +4626,15 @@ mod tests {
blockstore.store_duplicate_slot(4, vec![], vec![]).unwrap();
let ancestors = bank_forks.read().unwrap().ancestors();
let descendants = bank_forks.read().unwrap().descendants().clone();
let mut gossip_duplicate_confirmed_slots = BTreeMap::new();
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let bank4_hash = bank_forks.read().unwrap().get(4).unwrap().hash();
assert_ne!(bank4_hash, Hash::default());
check_slot_agrees_with_cluster(
4,
bank_forks.read().unwrap().root(),
Some(bank4_hash),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
@ -4627,6 +4663,7 @@ mod tests {
2,
bank_forks.read().unwrap().root(),
Some(bank2_hash),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,
@ -4655,6 +4692,7 @@ mod tests {
4,
bank_forks.read().unwrap().root(),
Some(bank4_hash),
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&ancestors,
&descendants,