Exit when stuck in an unrecoverable repair/purge loop (#28596)

* Exit when stuck in an unrecoverable repair/purge loop

* add tests
This commit is contained in:
Ashwin Sekar 2022-10-27 20:06:06 -07:00 committed by GitHub
parent 37fb603fc0
commit ae557a9eb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 6 deletions

View File

@ -760,7 +760,7 @@ mod test {
use { use {
super::*, super::*,
crate::{ crate::{
cluster_slot_state_verifier::DuplicateSlotsToRepair, cluster_slot_state_verifier::{DuplicateSlotsToRepair, PurgeRepairSlotCounter},
repair_service::DuplicateSlotsResetReceiver, repair_service::DuplicateSlotsResetReceiver,
replay_stage::{ replay_stage::{
tests::{replay_blockstore_components, ReplayBlockstoreComponents}, tests::{replay_blockstore_components, ReplayBlockstoreComponents},
@ -1542,6 +1542,7 @@ mod test {
&bank_forks, &bank_forks,
&requester_blockstore, &requester_blockstore,
None, None,
&mut PurgeRepairSlotCounter::default(),
); );
// Simulate making a request // Simulate making a request

View File

@ -11,6 +11,7 @@ use {
pub(crate) type DuplicateSlotsTracker = BTreeSet<Slot>; pub(crate) type DuplicateSlotsTracker = BTreeSet<Slot>;
pub(crate) type DuplicateSlotsToRepair = HashMap<Slot, Hash>; pub(crate) type DuplicateSlotsToRepair = HashMap<Slot, Hash>;
pub(crate) type PurgeRepairSlotCounter = BTreeMap<Slot, usize>;
pub(crate) type EpochSlotsFrozenSlots = BTreeMap<Slot, Hash>; pub(crate) type EpochSlotsFrozenSlots = BTreeMap<Slot, Hash>;
pub(crate) type GossipDuplicateConfirmedSlots = BTreeMap<Slot, Hash>; pub(crate) type GossipDuplicateConfirmedSlots = BTreeMap<Slot, Hash>;
@ -694,6 +695,7 @@ fn apply_state_changes(
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
blockstore: &Blockstore, blockstore: &Blockstore,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
state_changes: Vec<ResultingStateChange>, state_changes: Vec<ResultingStateChange>,
) { ) {
// Handle cases where the bank is frozen, but not duplicate confirmed // Handle cases where the bank is frozen, but not duplicate confirmed
@ -728,6 +730,7 @@ fn apply_state_changes(
) )
.unwrap(); .unwrap();
duplicate_slots_to_repair.remove(&slot); duplicate_slots_to_repair.remove(&slot);
purge_repair_slot_counter.remove(&slot);
} }
ResultingStateChange::SendAncestorHashesReplayUpdate(ancestor_hashes_replay_update) => { ResultingStateChange::SendAncestorHashesReplayUpdate(ancestor_hashes_replay_update) => {
let _ = ancestor_hashes_replay_update_sender.send(ancestor_hashes_replay_update); let _ = ancestor_hashes_replay_update_sender.send(ancestor_hashes_replay_update);
@ -750,6 +753,7 @@ pub(crate) fn check_slot_agrees_with_cluster(
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
slot_state_update: SlotStateUpdate, slot_state_update: SlotStateUpdate,
) { ) {
info!( info!(
@ -839,6 +843,7 @@ pub(crate) fn check_slot_agrees_with_cluster(
duplicate_slots_to_repair, duplicate_slots_to_repair,
blockstore, blockstore,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
state_changes, state_changes,
); );
} }
@ -1396,6 +1401,7 @@ mod test {
} = setup(); } = setup();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
// MarkSlotDuplicate should mark progress map and remove // MarkSlotDuplicate should mark progress map and remove
// the slot from fork choice // the slot from fork choice
@ -1414,6 +1420,7 @@ mod test {
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
vec![ResultingStateChange::MarkSlotDuplicate(duplicate_slot_hash)], vec![ResultingStateChange::MarkSlotDuplicate(duplicate_slot_hash)],
); );
assert!(!heaviest_subtree_fork_choice assert!(!heaviest_subtree_fork_choice
@ -1436,6 +1443,7 @@ mod test {
); );
} }
assert!(duplicate_slots_to_repair.is_empty()); assert!(duplicate_slots_to_repair.is_empty());
assert!(purge_repair_slot_counter.is_empty());
// Simulate detecting another hash that is the correct version, // Simulate detecting another hash that is the correct version,
// RepairDuplicateConfirmedVersion should add the slot to repair // RepairDuplicateConfirmedVersion should add the slot to repair
@ -1448,6 +1456,7 @@ mod test {
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
vec![ResultingStateChange::RepairDuplicateConfirmedVersion( vec![ResultingStateChange::RepairDuplicateConfirmedVersion(
correct_hash, correct_hash,
)], )],
@ -1457,6 +1466,7 @@ mod test {
*duplicate_slots_to_repair.get(&duplicate_slot).unwrap(), *duplicate_slots_to_repair.get(&duplicate_slot).unwrap(),
correct_hash correct_hash
); );
assert!(purge_repair_slot_counter.is_empty());
} }
#[test] #[test]
@ -1470,6 +1480,7 @@ mod test {
} = setup(); } = setup();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let duplicate_slot = bank_forks.read().unwrap().root() + 1; let duplicate_slot = bank_forks.read().unwrap().root() + 1;
let duplicate_slot_hash = bank_forks let duplicate_slot_hash = bank_forks
@ -1490,6 +1501,7 @@ mod test {
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
vec![ResultingStateChange::BankFrozen(duplicate_slot_hash)], vec![ResultingStateChange::BankFrozen(duplicate_slot_hash)],
); );
assert_eq!( assert_eq!(
@ -1513,6 +1525,7 @@ mod test {
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
vec![ResultingStateChange::BankFrozen(new_bank_hash)], vec![ResultingStateChange::BankFrozen(new_bank_hash)],
); );
assert_eq!( assert_eq!(
@ -1535,6 +1548,7 @@ mod test {
} = setup(); } = setup();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let duplicate_slot = bank_forks.read().unwrap().root() + 1; let duplicate_slot = bank_forks.read().unwrap().root() + 1;
let our_duplicate_slot_hash = bank_forks let our_duplicate_slot_hash = bank_forks
@ -1546,6 +1560,7 @@ mod test {
// Setup and check the state that is about to change. // Setup and check the state that is about to change.
duplicate_slots_to_repair.insert(duplicate_slot, Hash::new_unique()); duplicate_slots_to_repair.insert(duplicate_slot, Hash::new_unique());
purge_repair_slot_counter.insert(duplicate_slot, 1);
assert!(blockstore.get_bank_hash(duplicate_slot).is_none()); assert!(blockstore.get_bank_hash(duplicate_slot).is_none());
assert!(!blockstore.is_duplicate_confirmed(duplicate_slot)); assert!(!blockstore.is_duplicate_confirmed(duplicate_slot));
@ -1553,6 +1568,7 @@ mod test {
// 1) Re-enable fork choice // 1) Re-enable fork choice
// 2) Clear any pending repairs from `duplicate_slots_to_repair` since we have the // 2) Clear any pending repairs from `duplicate_slots_to_repair` since we have the
// right version now // right version now
// 3) Clear the slot from `purge_repair_slot_counter`
// 3) Set the status to duplicate confirmed in Blockstore // 3) Set the status to duplicate confirmed in Blockstore
let mut state_changes = vec![ResultingStateChange::DuplicateConfirmedSlotMatchesCluster( let mut state_changes = vec![ResultingStateChange::DuplicateConfirmedSlotMatchesCluster(
our_duplicate_slot_hash, our_duplicate_slot_hash,
@ -1566,6 +1582,7 @@ mod test {
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
state_changes, state_changes,
); );
for child_slot in descendants for child_slot in descendants
@ -1585,6 +1602,7 @@ mod test {
.is_candidate(&(duplicate_slot, our_duplicate_slot_hash)) .is_candidate(&(duplicate_slot, our_duplicate_slot_hash))
.unwrap()); .unwrap());
assert!(duplicate_slots_to_repair.is_empty()); assert!(duplicate_slots_to_repair.is_empty());
assert!(purge_repair_slot_counter.is_empty());
assert_eq!( assert_eq!(
blockstore.get_bank_hash(duplicate_slot).unwrap(), blockstore.get_bank_hash(duplicate_slot).unwrap(),
our_duplicate_slot_hash our_duplicate_slot_hash
@ -1627,6 +1645,7 @@ mod test {
let gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default(); let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let duplicate_slot = 2; let duplicate_slot = 2;
let duplicate_state = DuplicateState::new_from_state( let duplicate_state = DuplicateState::new_from_state(
duplicate_slot, duplicate_slot,
@ -1646,6 +1665,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&duplicate_slot)); assert!(duplicate_slots_tracker.contains(&duplicate_slot));
@ -1681,6 +1701,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::BankFrozen(bank_frozen_state), SlotStateUpdate::BankFrozen(bank_frozen_state),
); );
@ -1730,6 +1751,7 @@ mod test {
); );
let root = 0; let root = 0;
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
// Mark slot 2 as duplicate confirmed // Mark slot 2 as duplicate confirmed
@ -1751,6 +1773,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
assert!(heaviest_subtree_fork_choice assert!(heaviest_subtree_fork_choice
@ -1788,6 +1811,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&3)); assert!(duplicate_slots_tracker.contains(&3));
@ -1837,6 +1861,7 @@ mod test {
let root = 0; let root = 0;
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
// Mark 2 as duplicate // Mark 2 as duplicate
let slot2_hash = bank_forks.read().unwrap().get(2).unwrap().hash(); let slot2_hash = bank_forks.read().unwrap().get(2).unwrap().hash();
@ -1858,6 +1883,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&2)); assert!(duplicate_slots_tracker.contains(&2));
@ -1893,6 +1919,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
for slot in 0..=3 { for slot in 0..=3 {
@ -1954,6 +1981,7 @@ mod test {
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default(); let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
// Mark 3 as duplicate confirmed // Mark 3 as duplicate confirmed
gossip_duplicate_confirmed_slots.insert(3, slot3_hash); gossip_duplicate_confirmed_slots.insert(3, slot3_hash);
@ -1973,6 +2001,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
verify_all_slots_duplicate_confirmed(&bank_forks, &heaviest_subtree_fork_choice, 3, true); verify_all_slots_duplicate_confirmed(&bank_forks, &heaviest_subtree_fork_choice, 3, true);
@ -2001,6 +2030,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&1)); assert!(duplicate_slots_tracker.contains(&1));
@ -2032,6 +2062,7 @@ mod test {
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default(); let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
// Mark 3 as only epoch slots frozen, matching our `slot3_hash`, should not duplicate // Mark 3 as only epoch slots frozen, matching our `slot3_hash`, should not duplicate
// confirm the slot // confirm the slot
@ -2055,6 +2086,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
); );
verify_all_slots_duplicate_confirmed( verify_all_slots_duplicate_confirmed(
@ -2084,6 +2116,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
assert_eq!(*epoch_slots_frozen_slots.get(&3).unwrap(), slot3_hash); assert_eq!(*epoch_slots_frozen_slots.get(&3).unwrap(), slot3_hash);
@ -2120,6 +2153,7 @@ mod test {
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default(); let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
// Mark 3 as only epoch slots frozen with different hash than the our // Mark 3 as only epoch slots frozen with different hash than the our
// locally replayed `slot3_hash`. This should not duplicate confirm the slot, // locally replayed `slot3_hash`. This should not duplicate confirm the slot,
@ -2145,6 +2179,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
); );
assert_eq!(*duplicate_slots_to_repair.get(&3).unwrap(), mismatched_hash); assert_eq!(*duplicate_slots_to_repair.get(&3).unwrap(), mismatched_hash);
@ -2175,6 +2210,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
assert!(duplicate_slots_to_repair.is_empty()); assert!(duplicate_slots_to_repair.is_empty());

View File

@ -94,6 +94,7 @@ const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
// Expect this number to be small enough to minimize thread pool overhead while large enough // Expect this number to be small enough to minimize thread pool overhead while large enough
// to be able to replay all active forks at the same time in most cases. // to be able to replay all active forks at the same time in most cases.
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;
lazy_static! { lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
@ -463,6 +464,7 @@ impl ReplayStage {
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = GossipDuplicateConfirmedSlots::default(); let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = EpochSlotsFrozenSlots::default(); let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = EpochSlotsFrozenSlots::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = UnfrozenGossipVerifiedVoteHashes::default(); let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = UnfrozenGossipVerifiedVoteHashes::default();
let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default(); let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default();
let mut voted_signatures = Vec::new(); let mut voted_signatures = Vec::new();
@ -535,6 +537,7 @@ impl ReplayStage {
log_messages_bytes_limit, log_messages_bytes_limit,
replay_slots_concurrently, replay_slots_concurrently,
&prioritization_fee_cache, &prioritization_fee_cache,
&mut purge_repair_slot_counter,
); );
replay_active_banks_time.stop(); replay_active_banks_time.stop();
@ -554,7 +557,8 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&bank_forks, &bank_forks,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
); );
purge_dead_slots_time.stop(); purge_dead_slots_time.stop();
@ -571,6 +575,7 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
); );
process_gossip_duplicate_confirmed_slots_time.stop(); process_gossip_duplicate_confirmed_slots_time.stop();
@ -603,6 +608,7 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
); );
} }
process_duplicate_slots_time.stop(); process_duplicate_slots_time.stop();
@ -644,7 +650,7 @@ impl ReplayStage {
&bank_forks, &bank_forks,
); );
Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut epoch_slots_frozen_slots, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender); Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut epoch_slots_frozen_slots, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, &mut purge_repair_slot_counter);
} }
compute_slot_stats_time.stop(); compute_slot_stats_time.stop();
@ -867,7 +873,7 @@ impl ReplayStage {
// //
// Has to be before `maybe_start_leader()`. Otherwise, `ancestors` and `descendants` // Has to be before `maybe_start_leader()`. Otherwise, `ancestors` and `descendants`
// will be outdated, and we cannot assume `poh_bank` will be in either of these maps. // will be outdated, and we cannot assume `poh_bank` will be in either of these maps.
Self::dump_then_repair_correct_slots(&mut duplicate_slots_to_repair, &mut ancestors, &mut descendants, &mut progress, &bank_forks, &blockstore, poh_bank.map(|bank| bank.slot())); Self::dump_then_repair_correct_slots(&mut duplicate_slots_to_repair, &mut ancestors, &mut descendants, &mut progress, &bank_forks, &blockstore, poh_bank.map(|bank| bank.slot()), &mut purge_repair_slot_counter);
dump_then_repair_correct_slots_time.stop(); dump_then_repair_correct_slots_time.stop();
let mut retransmit_not_propagated_time = Measure::start("retransmit_not_propagated_time"); let mut retransmit_not_propagated_time = Measure::start("retransmit_not_propagated_time");
@ -1113,6 +1119,7 @@ impl ReplayStage {
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
blockstore: &Blockstore, blockstore: &Blockstore,
poh_bank_slot: Option<Slot>, poh_bank_slot: Option<Slot>,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) { ) {
if duplicate_slots_to_repair.is_empty() { if duplicate_slots_to_repair.is_empty() {
return; return;
@ -1181,9 +1188,16 @@ impl ReplayStage {
bank_forks, bank_forks,
blockstore, blockstore,
); );
let attempt_no = purge_repair_slot_counter
.entry(*duplicate_slot)
.and_modify(|x| *x += 1)
.or_insert(1);
if *attempt_no > MAX_REPAIR_RETRY_LOOP_ATTEMPTS {
panic!("We have tried to repair duplicate slot: {} more than {} times and are unable to freeze a block with bankhash {}, instead we have a block with bankhash {:?}. This is most likely a bug in the runtime. At this point manual intervention is needed to make progress. Exiting", *duplicate_slot, MAX_REPAIR_RETRY_LOOP_ATTEMPTS, *correct_hash, frozen_hash);
}
warn!( warn!(
"Notifying repair service to repair duplicate slot: {}", "Notifying repair service to repair duplicate slot: {}, attempt {}",
*duplicate_slot, *duplicate_slot, *attempt_no,
); );
true true
// TODO: Send signal to repair to repair the correct version of // TODO: Send signal to repair to repair the correct version of
@ -1216,6 +1230,7 @@ impl ReplayStage {
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) { ) {
let root = bank_forks.read().unwrap().root(); let root = bank_forks.read().unwrap().root();
for maybe_purgeable_duplicate_slots in epoch_slots_frozen_receiver.try_iter() { for maybe_purgeable_duplicate_slots in epoch_slots_frozen_receiver.try_iter() {
@ -1249,6 +1264,7 @@ impl ReplayStage {
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
); );
} }
@ -1392,6 +1408,7 @@ impl ReplayStage {
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) { ) {
let root = bank_forks.read().unwrap().root(); let root = bank_forks.read().unwrap().root();
for new_confirmed_slots in gossip_duplicate_confirmed_slots_receiver.try_iter() { for new_confirmed_slots in gossip_duplicate_confirmed_slots_receiver.try_iter() {
@ -1420,6 +1437,7 @@ impl ReplayStage {
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
} }
@ -1458,6 +1476,7 @@ impl ReplayStage {
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) { ) {
let new_duplicate_slots: Vec<Slot> = duplicate_slots_receiver.try_iter().collect(); let new_duplicate_slots: Vec<Slot> = duplicate_slots_receiver.try_iter().collect();
let (root_slot, bank_hashes) = { let (root_slot, bank_hashes) = {
@ -1489,6 +1508,7 @@ impl ReplayStage {
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
} }
@ -1760,6 +1780,7 @@ impl ReplayStage {
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) { ) {
// Do not remove from progress map when marking dead! Needed by // Do not remove from progress map when marking dead! Needed by
// `process_gossip_duplicate_confirmed_slots()` // `process_gossip_duplicate_confirmed_slots()`
@ -1813,6 +1834,7 @@ impl ReplayStage {
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::Dead(dead_state), SlotStateUpdate::Dead(dead_state),
); );
} }
@ -2426,6 +2448,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>, block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_result_vec: &[ReplaySlotFromBlockstore], replay_result_vec: &[ReplaySlotFromBlockstore],
prioritization_fee_cache: &PrioritizationFeeCache, prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool { ) -> bool {
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe. // TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
let mut did_complete_bank = false; let mut did_complete_bank = false;
@ -2456,6 +2479,7 @@ impl ReplayStage {
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
); );
// If the bank was corrupted, don't try to run the below logic to check if the // If the bank was corrupted, don't try to run the below logic to check if the
// bank is completed // bank is completed
@ -2527,6 +2551,7 @@ impl ReplayStage {
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::BankFrozen(bank_frozen_state), SlotStateUpdate::BankFrozen(bank_frozen_state),
); );
if let Some(sender) = bank_notification_sender { if let Some(sender) = bank_notification_sender {
@ -2621,6 +2646,7 @@ impl ReplayStage {
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
replay_slots_concurrently: bool, replay_slots_concurrently: bool,
prioritization_fee_cache: &PrioritizationFeeCache, prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool /* completed a bank */ { ) -> bool /* completed a bank */ {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots(); let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len(); let num_active_banks = active_bank_slots.len();
@ -2689,6 +2715,7 @@ impl ReplayStage {
block_metadata_notifier, block_metadata_notifier,
&replay_result_vec, &replay_result_vec,
prioritization_fee_cache, prioritization_fee_cache,
purge_repair_slot_counter,
) )
} else { } else {
false false
@ -3228,6 +3255,7 @@ impl ReplayStage {
did_newly_reach_threshold did_newly_reach_threshold
} }
#[allow(clippy::too_many_arguments)]
fn mark_slots_confirmed( fn mark_slots_confirmed(
confirmed_forks: &[(Slot, Hash)], confirmed_forks: &[(Slot, Hash)],
blockstore: &Blockstore, blockstore: &Blockstore,
@ -3238,6 +3266,7 @@ impl ReplayStage {
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) { ) {
let root_slot = bank_forks.read().unwrap().root(); let root_slot = bank_forks.read().unwrap().root();
for (slot, frozen_hash) in confirmed_forks.iter() { for (slot, frozen_hash) in confirmed_forks.iter() {
@ -3265,6 +3294,7 @@ impl ReplayStage {
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
} }
@ -4253,6 +4283,7 @@ pub(crate) mod tests {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
); );
} }
@ -5687,6 +5718,7 @@ pub(crate) mod tests {
// Mark 5 as duplicate // Mark 5 as duplicate
blockstore.store_duplicate_slot(5, vec![], vec![]).unwrap(); blockstore.store_duplicate_slot(5, vec![], vec![]).unwrap();
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default(); let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default(); let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default(); let mut epoch_slots_frozen_slots = EpochSlotsFrozenSlots::default();
let bank5_hash = bank_forks.read().unwrap().bank_hash(5).unwrap(); let bank5_hash = bank_forks.read().unwrap().bank_hash(5).unwrap();
@ -5709,6 +5741,7 @@ pub(crate) mod tests {
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
@ -5741,6 +5774,7 @@ pub(crate) mod tests {
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
// The confirmed hash is detected in `progress`, which means // The confirmed hash is detected in `progress`, which means
@ -5841,6 +5875,7 @@ pub(crate) mod tests {
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
@ -5874,6 +5909,7 @@ pub(crate) mod tests {
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
@ -5908,6 +5944,7 @@ pub(crate) mod tests {
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
// The confirmed hash is detected in `progress`, which means // The confirmed hash is detected in `progress`, which means
@ -5955,6 +5992,7 @@ pub(crate) mod tests {
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default(); let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
duplicate_slots_to_repair.insert(1, Hash::new_unique()); duplicate_slots_to_repair.insert(1, Hash::new_unique());
duplicate_slots_to_repair.insert(2, Hash::new_unique()); duplicate_slots_to_repair.insert(2, Hash::new_unique());
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
ReplayStage::dump_then_repair_correct_slots( ReplayStage::dump_then_repair_correct_slots(
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
@ -5964,6 +6002,7 @@ pub(crate) mod tests {
bank_forks, bank_forks,
blockstore, blockstore,
None, None,
&mut purge_repair_slot_counter,
); );
let r_bank_forks = bank_forks.read().unwrap(); let r_bank_forks = bank_forks.read().unwrap();
@ -5981,6 +6020,9 @@ pub(crate) mod tests {
assert!(descendants_result.is_none()); assert!(descendants_result.is_none());
} }
} }
assert_eq!(2, purge_repair_slot_counter.len());
assert_eq!(1, *purge_repair_slot_counter.get(&1).unwrap());
assert_eq!(1, *purge_repair_slot_counter.get(&2).unwrap());
} }
fn setup_vote_then_rollback( fn setup_vote_then_rollback(
@ -6057,6 +6099,7 @@ pub(crate) mod tests {
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
&mut PurgeRepairSlotCounter::default(),
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
assert_eq!( assert_eq!(
@ -6075,6 +6118,7 @@ pub(crate) mod tests {
bank_forks, bank_forks,
blockstore, blockstore,
None, None,
&mut PurgeRepairSlotCounter::default(),
); );
// Check everything was purged properly // Check everything was purged properly