Add EpochSlots frozen state transition (#19112)

This commit is contained in:
carllin 2021-08-13 14:21:52 -07:00 committed by GitHub
parent 176036aa58
commit 22674000bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 902 additions and 118 deletions

View File

@ -1367,7 +1367,7 @@ mod test {
// Simulate Replay dumping this slot
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
duplicate_slots_to_repair.insert((dead_slot, Hash::new_unique()));
duplicate_slots_to_repair.insert(dead_slot, Hash::new_unique());
ReplayStage::dump_then_repair_correct_slots(
&mut duplicate_slots_to_repair,
&mut bank_forks.read().unwrap().ancestors(),

File diff suppressed because it is too large Load Diff

View File

@ -317,7 +317,7 @@ impl ReplayStage {
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender,
_duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver,
epoch_slots_frozen_receiver: DuplicateSlotsResetReceiver,
replay_vote_sender: ReplayVoteSender,
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
@ -374,7 +374,8 @@ impl ReplayStage {
let mut replay_timing = ReplayTiming::default();
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = GossipDuplicateConfirmedSlots::default();
let mut duplicate_slots_to_repair = HashSet::new();
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = EpochSlotsFrozenSlots::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = UnfrozenGossipVerifiedVoteHashes::default();
let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default();
let mut voted_signatures = Vec::new();
@ -421,6 +422,7 @@ impl ReplayStage {
&rpc_subscriptions,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&mut epoch_slots_frozen_slots,
&mut unfrozen_gossip_verified_vote_hashes,
&mut latest_validator_votes_for_frozen_banks,
&cluster_slots_update_sender,
@ -432,6 +434,24 @@ impl ReplayStage {
let forks_root = bank_forks.read().unwrap().root();
// Reset any dead slots that have been frozen by a sufficient portion of
// the network. Signalled by repair_service.
let mut purge_dead_slots_time = Measure::start("purge_dead_slots");
Self::process_epoch_slots_frozen_dead_slots(
&my_pubkey,
&blockstore,
&epoch_slots_frozen_receiver,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&mut epoch_slots_frozen_slots,
&mut progress,
&mut heaviest_subtree_fork_choice,
&bank_forks,
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender
);
purge_dead_slots_time.stop();
// Check for any newly confirmed slots detected from gossip.
let mut process_gossip_duplicate_confirmed_slots_time = Measure::start("process_gossip_duplicate_confirmed_slots");
Self::process_gossip_duplicate_confirmed_slots(
@ -439,6 +459,7 @@ impl ReplayStage {
&blockstore,
&mut duplicate_slots_tracker,
&mut gossip_duplicate_confirmed_slots,
&mut epoch_slots_frozen_slots,
&bank_forks,
&mut progress,
&mut heaviest_subtree_fork_choice,
@ -470,6 +491,7 @@ impl ReplayStage {
&duplicate_slots_receiver,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&mut epoch_slots_frozen_slots,
&bank_forks,
&mut progress,
&mut heaviest_subtree_fork_choice,
@ -516,7 +538,7 @@ impl ReplayStage {
&bank_forks,
);
Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender);
Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut epoch_slots_frozen_slots, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender);
}
compute_slot_stats_time.stop();
@ -618,6 +640,7 @@ impl ReplayStage {
&mut has_new_vote_been_rooted,
&mut replay_timing,
&voting_sender,
&mut epoch_slots_frozen_slots,
);
};
voting_time.stop();
@ -875,7 +898,7 @@ impl ReplayStage {
// TODO: handle if alternate version of descendant also got confirmed after ancestor was
// confirmed, what happens then? Should probably keep track of purged list and skip things
// in `duplicate_slots_to_repair` that have already been purged. Add test.
duplicate_slots_to_repair.retain(|(duplicate_slot, correct_hash)| {
duplicate_slots_to_repair.retain(|duplicate_slot, correct_hash| {
// Should not purge duplicate slots if there is currently a poh bank building
// on top of that slot, as BankingStage might still be referencing/touching that state
// concurrently.
@ -934,6 +957,10 @@ impl ReplayStage {
bank_forks,
blockstore,
);
warn!(
"Notifying repair service to repair duplicate slot: {}",
*duplicate_slot,
);
true
// TODO: Send signal to repair to repair the correct version of
// `duplicate_slot` with hash == `correct_hash`
@ -952,6 +979,58 @@ impl ReplayStage {
});
}
#[allow(clippy::too_many_arguments)]
fn process_epoch_slots_frozen_dead_slots(
pubkey: &Pubkey,
blockstore: &Blockstore,
epoch_slots_frozen_receiver: &DuplicateSlotsResetReceiver,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
progress: &mut ProgressMap,
fork_choice: &mut HeaviestSubtreeForkChoice,
bank_forks: &RwLock<BankForks>,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
) {
let root = bank_forks.read().unwrap().root();
for maybe_purgeable_duplicate_slots in epoch_slots_frozen_receiver.try_iter() {
warn!(
"{} ReplayStage notified of epoch slots duplicate frozen dead slots: {:?}",
pubkey, maybe_purgeable_duplicate_slots
);
for (epoch_slots_frozen_slot, epoch_slots_frozen_hash) in
maybe_purgeable_duplicate_slots.into_iter()
{
let epoch_slots_frozen_state = EpochSlotsFrozenState::new_from_state(
epoch_slots_frozen_slot,
epoch_slots_frozen_hash,
gossip_duplicate_confirmed_slots,
fork_choice,
|| progress.is_dead(epoch_slots_frozen_slot).unwrap_or(false),
|| {
bank_forks
.read()
.unwrap()
.get(epoch_slots_frozen_slot)
.map(|b| b.hash())
},
);
check_slot_agrees_with_cluster(
epoch_slots_frozen_slot,
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
);
}
}
}
fn purge_unconfirmed_duplicate_slot(
duplicate_slot: Slot,
ancestors: &mut HashMap<Slot, HashSet<Slot>>,
@ -1077,11 +1156,13 @@ impl ReplayStage {
// optimistic and in the future, duplicate slot confirmations on the exact
// single slots and does not account for votes on their descendants. Used solely
// for duplicate slot recovery.
#[allow(clippy::too_many_arguments)]
fn process_gossip_duplicate_confirmed_slots(
gossip_duplicate_confirmed_slots_receiver: &GossipDuplicateConfirmedSlotsReceiver,
blockstore: &Blockstore,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
fork_choice: &mut HeaviestSubtreeForkChoice,
@ -1111,6 +1192,7 @@ impl ReplayStage {
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
@ -1140,11 +1222,13 @@ impl ReplayStage {
}
// Checks for and handle forks with duplicate slots.
#[allow(clippy::too_many_arguments)]
fn process_duplicate_slots(
blockstore: &Blockstore,
duplicate_slots_receiver: &DuplicateSlotReceiver,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
fork_choice: &mut HeaviestSubtreeForkChoice,
@ -1177,6 +1261,7 @@ impl ReplayStage {
root_slot,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
@ -1425,6 +1510,7 @@ impl ReplayStage {
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
progress: &mut ProgressMap,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
@ -1468,12 +1554,14 @@ impl ReplayStage {
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
epoch_slots_frozen_slots,
);
check_slot_agrees_with_cluster(
slot,
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
@ -1507,6 +1595,7 @@ impl ReplayStage {
has_new_vote_been_rooted: &mut bool,
replay_timing: &mut ReplayTiming,
voting_sender: &Sender<VoteOp>,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
) {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
@ -1556,6 +1645,7 @@ impl ReplayStage {
unfrozen_gossip_verified_vote_hashes,
has_new_vote_been_rooted,
vote_signatures,
epoch_slots_frozen_slots,
);
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
@ -1874,6 +1964,7 @@ impl ReplayStage {
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks,
cluster_slots_update_sender: &ClusterSlotsUpdateSender,
@ -1943,6 +2034,7 @@ impl ReplayStage {
rpc_subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
@ -1993,12 +2085,14 @@ impl ReplayStage {
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
epoch_slots_frozen_slots,
);
check_slot_agrees_with_cluster(
bank.slot(),
bank_forks.read().unwrap().root(),
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
@ -2519,6 +2613,7 @@ impl ReplayStage {
progress: &mut ProgressMap,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
fork_choice: &mut HeaviestSubtreeForkChoice,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
) {
@ -2544,6 +2639,7 @@ impl ReplayStage {
root_slot,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
@ -2600,6 +2696,7 @@ impl ReplayStage {
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
has_new_vote_been_rooted: &mut bool,
voted_signatures: &mut Vec<Signature>,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
) {
bank_forks.write().unwrap().set_root(
new_root,
@ -2630,6 +2727,9 @@ impl ReplayStage {
std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root);
unfrozen_gossip_verified_vote_hashes.set_root(new_root);
let mut slots_ge_root = epoch_slots_frozen_slots.split_off(&new_root);
// epoch_slots_frozen_slots now only contains entries >= `new_root`
std::mem::swap(epoch_slots_frozen_slots, &mut slots_ge_root);
}
fn generate_new_bank_forks(
@ -3066,6 +3166,10 @@ pub mod tests {
.map(|s| (s, HashMap::new()))
.collect(),
};
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = vec![root - 1, root, root + 1]
.into_iter()
.map(|slot| (slot, Hash::default()))
.collect();
ReplayStage::handle_new_root(
root,
&bank_forks,
@ -3078,6 +3182,7 @@ pub mod tests {
&mut unfrozen_gossip_verified_vote_hashes,
&mut true,
&mut Vec::new(),
&mut epoch_slots_frozen_slots,
);
assert_eq!(bank_forks.read().unwrap().root(), root);
assert_eq!(progress.len(), 1);
@ -3102,6 +3207,13 @@ pub mod tests {
.collect::<Vec<Slot>>(),
vec![root, root + 1]
);
assert_eq!(
epoch_slots_frozen_slots
.into_iter()
.map(|(slot, _hash)| slot)
.collect::<Vec<Slot>>(),
vec![root, root + 1]
);
}
#[test]
@ -3149,6 +3261,7 @@ pub mod tests {
&mut UnfrozenGossipVerifiedVoteHashes::default(),
&mut true,
&mut Vec::new(),
&mut EpochSlotsFrozenSlots::default(),
);
assert_eq!(bank_forks.read().unwrap().root(), root);
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
@ -3434,7 +3547,8 @@ pub mod tests {
err,
&rpc_subscriptions,
&mut DuplicateSlotsTracker::default(),
&GossipDuplicateConfirmedSlots::default(),
&GossipDuplicateConfirmedSlots::new(),
&mut EpochSlotsFrozenSlots::default(),
&mut progress,
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
@ -4867,6 +4981,7 @@ pub mod tests {
blockstore.store_duplicate_slot(4, vec![], vec![]).unwrap();
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default();
let bank4_hash = bank_forks.read().unwrap().bank_hash(4).unwrap();
assert_ne!(bank4_hash, Hash::default());
let duplicate_state = DuplicateState::new_from_state(
@ -4883,6 +4998,7 @@ pub mod tests {
bank_forks.read().unwrap().root(),
&blockstore,
&mut duplicate_slots_tracker,
&mut epoch_slots_frozen_slots,
&mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
@ -4915,6 +5031,7 @@ pub mod tests {
bank_forks.read().unwrap().root(),
&blockstore,
&mut duplicate_slots_tracker,
&mut epoch_slots_frozen_slots,
&mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
@ -4936,7 +5053,7 @@ pub mod tests {
// If slot 4 is marked as confirmed, then this confirms slot 2 and 4, and
// then slot 4 is now the heaviest bank again
let mut duplicate_slots_to_repair = HashSet::new();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
gossip_duplicate_confirmed_slots.insert(4, bank4_hash);
let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
bank4_hash,
@ -4948,6 +5065,7 @@ pub mod tests {
bank_forks.read().unwrap().root(),
&blockstore,
&mut duplicate_slots_tracker,
&mut epoch_slots_frozen_slots,
&mut vote_simulator.heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
@ -4996,8 +5114,9 @@ pub mod tests {
// Insert different versions of both 1 and 2. Both slots 1 and 2 should
// then be purged
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
duplicate_slots_to_repair.insert((1, Hash::new_unique()));
duplicate_slots_to_repair.insert((2, Hash::new_unique()));
duplicate_slots_to_repair.insert(1, Hash::new_unique());
duplicate_slots_to_repair.insert(2, Hash::new_unique());
ReplayStage::dump_then_repair_correct_slots(
&mut duplicate_slots_to_repair,
&mut ancestors,
@ -5079,6 +5198,7 @@ pub mod tests {
gossip_duplicate_confirmed_slots.insert(2, duplicate_confirmed_bank2_hash);
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default();
// Mark fork choice branch as invalid so select forks below doesn't panic
// on a nonexistent `heaviest_bank_on_same_fork` after we dump the duplciate fork.
@ -5094,12 +5214,16 @@ pub mod tests {
bank_forks.read().unwrap().root(),
blockstore,
&mut duplicate_slots_tracker,
&mut epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
);
assert!(duplicate_slots_to_repair.contains(&(2, duplicate_confirmed_bank2_hash)));
assert_eq!(
*duplicate_slots_to_repair.get(&2).unwrap(),
duplicate_confirmed_bank2_hash
);
let mut ancestors = bank_forks.read().unwrap().ancestors();
let mut descendants = bank_forks.read().unwrap().descendants().clone();
let old_descendants_of_2 = descendants.get(&2).unwrap().clone();

View File

@ -1,6 +1,8 @@
use crate::{
cluster_info_vote_listener::VoteTracker,
cluster_slot_state_verifier::{DuplicateSlotsTracker, GossipDuplicateConfirmedSlots},
cluster_slot_state_verifier::{
DuplicateSlotsTracker, EpochSlotsFrozenSlots, GossipDuplicateConfirmedSlots,
},
cluster_slots::ClusterSlots,
consensus::Tower,
fork_choice::SelectVoteAndResetForkResult,
@ -212,6 +214,7 @@ impl VoteSimulator {
&mut UnfrozenGossipVerifiedVoteHashes::default(),
&mut true,
&mut Vec::new(),
&mut EpochSlotsFrozenSlots::default(),
)
}

View File

@ -2008,8 +2008,13 @@ fn test_snapshots_restart_validity() {
#[allow(unused_attributes)]
#[ignore]
fn test_fail_entry_verification_leader() {
let (cluster, _) =
test_faulty_node(BroadcastStageType::FailEntryVerification, vec![60, 50, 60]);
let leader_stake = (DUPLICATE_THRESHOLD * 100.0) as u64 + 1;
let validator_stake1 = (100 - leader_stake) / 2;
let validator_stake2 = 100 - leader_stake - validator_stake1;
let (cluster, _) = test_faulty_node(
BroadcastStageType::FailEntryVerification,
vec![leader_stake, validator_stake1, validator_stake2],
);
cluster.check_for_new_roots(
16,
"test_fail_entry_verification_leader",