patches rust code formatting in core/src/replay_stage.rs (#29123)

This commit is contained in:
behzad nouri 2022-12-06 22:09:57 +00:00 committed by GitHub
parent 176caf9283
commit df7fd8ae5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 534 additions and 481 deletions

View File

@ -438,19 +438,13 @@ impl ReplayStage {
block_commitment_cache.clone(),
rpc_subscriptions.clone(),
);
#[allow(clippy::cognitive_complexity)]
let t_replay = Builder::new()
.name("solReplayStage".to_string())
.spawn(move || {
let run_replay = move || {
let verify_recyclers = VerifyRecyclers::default();
let _exit = Finalizer::new(exit.clone());
let mut identity_keypair = cluster_info.keypair().clone();
let mut my_pubkey = identity_keypair.pubkey();
let (
mut progress,
mut heaviest_subtree_fork_choice,
) = Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
let (mut progress, mut heaviest_subtree_fork_choice) =
Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
&bank_forks,
&my_pubkey,
&vote_account,
@ -461,12 +455,16 @@ impl ReplayStage {
let mut skipped_slots_info = SkippedSlotsInfo::default();
let mut replay_timing = ReplayTiming::default();
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots = EpochSlotsFrozenSlots::default();
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots =
GossipDuplicateConfirmedSlots::default();
let mut epoch_slots_frozen_slots: EpochSlotsFrozenSlots =
EpochSlotsFrozenSlots::default();
let mut duplicate_slots_to_repair = DuplicateSlotsToRepair::default();
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = UnfrozenGossipVerifiedVoteHashes::default();
let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default();
let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes =
UnfrozenGossipVerifiedVoteHashes::default();
let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks =
LatestValidatorVotesForFrozenBanks::default();
let mut voted_signatures = Vec::new();
let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
let mut last_vote_refresh_time = LastVoteRefreshTime {
@ -475,7 +473,10 @@ impl ReplayStage {
};
let (working_bank, in_vote_only_mode) = {
let r_bank_forks = bank_forks.read().unwrap();
(r_bank_forks.working_bank(), r_bank_forks.get_vote_only_mode_signal())
(
r_bank_forks.working_bank(),
r_bank_forks.get_vote_only_mode_signal(),
)
};
Self::reset_poh_recorder(
@ -563,7 +564,8 @@ impl ReplayStage {
purge_dead_slots_time.stop();
// Check for any newly confirmed slots detected from gossip.
let mut process_gossip_duplicate_confirmed_slots_time = Measure::start("process_gossip_duplicate_confirmed_slots");
let mut process_gossip_duplicate_confirmed_slots_time =
Measure::start("process_gossip_duplicate_confirmed_slots");
Self::process_gossip_duplicate_confirmed_slots(
&gossip_duplicate_confirmed_slots_receiver,
&blockstore,
@ -579,12 +581,12 @@ impl ReplayStage {
);
process_gossip_duplicate_confirmed_slots_time.stop();
// Ingest any new verified votes from gossip. Important for fork choice
// and switching proofs because these may be votes that haven't yet been
// included in a block, so we may not have yet observed these votes just
// by replaying blocks.
let mut process_unfrozen_gossip_verified_vote_hashes_time = Measure::start("process_gossip_verified_vote_hashes");
let mut process_unfrozen_gossip_verified_vote_hashes_time =
Measure::start("process_gossip_verified_vote_hashes");
Self::process_gossip_verified_vote_hashes(
&gossip_verified_vote_hash_receiver,
&mut unfrozen_gossip_verified_vote_hashes,
@ -650,28 +652,55 @@ impl ReplayStage {
&bank_forks,
);
Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut epoch_slots_frozen_slots, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, &mut purge_repair_slot_counter);
Self::mark_slots_confirmed(
&confirmed_forks,
&blockstore,
&bank_forks,
&mut progress,
&mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice,
&mut epoch_slots_frozen_slots,
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
);
}
compute_slot_stats_time.stop();
let mut select_forks_time = Measure::start("select_forks_time");
let (heaviest_bank, heaviest_bank_on_same_voted_fork) = heaviest_subtree_fork_choice
.select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks);
let (heaviest_bank, heaviest_bank_on_same_voted_fork) =
heaviest_subtree_fork_choice.select_forks(
&frozen_banks,
&tower,
&progress,
&ancestors,
&bank_forks,
);
select_forks_time.stop();
Self::check_for_vote_only_mode(heaviest_bank.slot(), forks_root, &in_vote_only_mode, &bank_forks);
Self::check_for_vote_only_mode(
heaviest_bank.slot(),
forks_root,
&in_vote_only_mode,
&bank_forks,
);
if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() {
if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) {
Self::refresh_last_vote(&mut tower,
if let Some(heaviest_bank_on_same_voted_fork) =
heaviest_bank_on_same_voted_fork.as_ref()
{
if let Some(my_latest_landed_vote) =
progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot())
{
Self::refresh_last_vote(
&mut tower,
heaviest_bank_on_same_voted_fork,
my_latest_landed_vote,
&vote_account,
&identity_keypair,
&authorized_voter_keypairs.read().unwrap(),
&mut voted_signatures,
has_new_vote_been_rooted, &mut
last_vote_refresh_time,
has_new_vote_been_rooted,
&mut last_vote_refresh_time,
&voting_sender,
wait_to_vote_slot,
);
@ -768,10 +797,9 @@ impl ReplayStage {
if last_reset != reset_bank.last_blockhash() {
info!(
"vote bank: {:?} reset bank: {:?}",
vote_bank.as_ref().map(|(b, switch_fork_decision)| (
b.slot(),
switch_fork_decision
)),
vote_bank
.as_ref()
.map(|(b, switch_fork_decision)| (b.slot(), switch_fork_decision)),
reset_bank.slot(),
);
let fork_progress = progress
@ -797,9 +825,12 @@ impl ReplayStage {
.and_then(|restored_tower| {
let root_bank = bank_forks.read().unwrap().root_bank();
let slot_history = root_bank.get_slot_history();
restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
}).
unwrap_or_else(|err| {
restored_tower.adjust_lockouts_after_replay(
root_bank.slot(),
&slot_history,
)
})
.unwrap_or_else(|err| {
if err.is_file_missing() {
Tower::new_from_bankforks(
&bank_forks.read().unwrap(),
@ -831,10 +862,13 @@ impl ReplayStage {
if let Some(last_voted_slot) = tower.last_voted_slot() {
// If the current heaviest bank is not a descendant of the last voted slot,
// there must be a partition
let partition_detected = Self::is_partition_detected(&ancestors, last_voted_slot, heaviest_bank.slot());
let partition_detected = Self::is_partition_detected(
&ancestors,
last_voted_slot,
heaviest_bank.slot(),
);
if !partition_exists && partition_detected
{
if !partition_exists && partition_detected {
warn!(
"PARTITION DETECTED waiting to join heaviest fork: {} last vote: {:?}, reset slot: {}",
heaviest_bank.slot(),
@ -847,9 +881,7 @@ impl ReplayStage {
("slot", reset_bank.slot() as i64, i64)
);
partition_exists = true;
} else if partition_exists
&& !partition_detected
{
} else if partition_exists && !partition_detected {
warn!(
"PARTITION resolved heaviest fork: {} last vote: {:?}, reset slot: {}",
heaviest_bank.slot(),
@ -865,7 +897,8 @@ impl ReplayStage {
reset_bank_time.stop();
let mut start_leader_time = Measure::start("start_leader_time");
let mut dump_then_repair_correct_slots_time = Measure::start("dump_then_repair_correct_slots_time");
let mut dump_then_repair_correct_slots_time =
Measure::start("dump_then_repair_correct_slots_time");
// Used for correctness check
let poh_bank = poh_recorder.read().unwrap().bank();
// Dump any duplicate slots that have been confirmed by the network in
@ -873,10 +906,20 @@ impl ReplayStage {
//
// Has to be before `maybe_start_leader()`. Otherwise, `ancestors` and `descendants`
// will be outdated, and we cannot assume `poh_bank` will be in either of these maps.
Self::dump_then_repair_correct_slots(&mut duplicate_slots_to_repair, &mut ancestors, &mut descendants, &mut progress, &bank_forks, &blockstore, poh_bank.map(|bank| bank.slot()), &mut purge_repair_slot_counter);
Self::dump_then_repair_correct_slots(
&mut duplicate_slots_to_repair,
&mut ancestors,
&mut descendants,
&mut progress,
&bank_forks,
&blockstore,
poh_bank.map(|bank| bank.slot()),
&mut purge_repair_slot_counter,
);
dump_then_repair_correct_slots_time.stop();
let mut retransmit_not_propagated_time = Measure::start("retransmit_not_propagated_time");
let mut retransmit_not_propagated_time =
Measure::start("retransmit_not_propagated_time");
Self::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
@ -949,7 +992,10 @@ impl ReplayStage {
retransmit_not_propagated_time.as_us(),
);
}
})
};
let t_replay = Builder::new()
.name("solReplayStage".to_string())
.spawn(run_replay)
.unwrap();
Ok(Self {
@ -2108,9 +2154,15 @@ impl ReplayStage {
if my_latest_landed_vote >= last_voted_slot
|| heaviest_bank_on_same_fork
.is_hash_valid_for_age(&tower.last_vote_tx_blockhash(), MAX_PROCESSING_AGE)
|| {
// In order to avoid voting on multiple forks all past MAX_PROCESSING_AGE that don't
// include the last voted blockhash
|| last_vote_refresh_time.last_refresh_time.elapsed().as_millis() < MAX_VOTE_REFRESH_INTERVAL_MILLIS as u128
last_vote_refresh_time
.last_refresh_time
.elapsed()
.as_millis()
< MAX_VOTE_REFRESH_INTERVAL_MILLIS as u128
}
{
return;
}
@ -3149,13 +3201,14 @@ impl ReplayStage {
// all its ancestor banks have also reached propagation
// threshold as well (Validators can't have voted for a
// descendant without also getting the ancestor block)
if leader_propagated_stats.is_propagated ||
if leader_propagated_stats.is_propagated || {
// If there's no new validators to record, and there's no
// newly achieved threshold, then there's no further
// information to propagate backwards to past leader blocks
(newly_voted_pubkeys.is_empty() && cluster_slot_pubkeys.is_empty() &&
!did_newly_reach_threshold)
{
newly_voted_pubkeys.is_empty()
&& cluster_slot_pubkeys.is_empty()
&& !did_newly_reach_threshold
} {
break;
}