Remove loop (#8493)

This commit is contained in:
carllin 2020-02-26 19:59:29 -08:00 committed by GitHub
parent 8c07ba635e
commit 5f766cd20b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 117 additions and 125 deletions

View File

@ -242,137 +242,129 @@ impl ReplayStage {
Self::report_memory(&allocated, "replay_active_banks", start);
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
loop {
let start = allocated.get();
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
let newly_computed_slot_stats = Self::compute_bank_stats(
&my_pubkey,
&ancestors,
&mut frozen_banks,
let start = allocated.get();
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
let newly_computed_slot_stats = Self::compute_bank_stats(
&my_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
&mut progress,
);
for slot in newly_computed_slot_stats {
let fork_stats = &progress.get(&slot).unwrap().fork_stats;
let confirmed_forks = Self::confirm_forks(
&tower,
&mut progress,
&fork_stats.stake_lockouts,
fork_stats.total_staked,
&progress,
&bank_forks,
);
for slot in newly_computed_slot_stats {
let fork_stats = &progress.get(&slot).unwrap().fork_stats;
let confirmed_forks = Self::confirm_forks(
&tower,
&fork_stats.stake_lockouts,
fork_stats.total_staked,
&progress,
&bank_forks,
);
for slot in confirmed_forks {
progress
.get_mut(&slot)
.unwrap()
.fork_stats
.confirmation_reported = true;
}
}
let vote_bank = Self::select_fork(&frozen_banks, &tower, &progress);
Self::report_memory(&allocated, "select_fork", start);
if vote_bank.is_none() {
break;
}
let bank = vote_bank.unwrap();
let (is_locked_out, vote_threshold, fork_weight, total_staked) = {
let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats;
(
fork_stats.is_locked_out,
fork_stats.vote_threshold,
fork_stats.weight,
fork_stats.total_staked,
)
};
let mut done = false;
let mut vote_bank_slot = None;
let start = allocated.get();
if !is_locked_out && vote_threshold {
info!("voting: {} {}", bank.slot(), fork_weight);
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
{
Self::log_leader_change(
&my_pubkey,
bank.slot(),
&mut current_leader,
&votable_leader,
);
}
vote_bank_slot = Some(bank.slot());
Self::handle_votable_bank(
&bank,
&bank_forks,
&mut tower,
&mut progress,
&vote_account,
&voting_keypair,
&cluster_info,
&blockstore,
&leader_schedule_cache,
&root_bank_sender,
total_staked,
&lockouts_sender,
&snapshot_package_sender,
&latest_root_senders,
)?;
}
Self::report_memory(&allocated, "votable_bank", start);
let start = allocated.get();
if last_reset != bank.last_blockhash() {
Self::reset_poh_recorder(
&my_pubkey,
&blockstore,
&bank,
&poh_recorder,
&leader_schedule_cache,
);
last_reset = bank.last_blockhash();
tpu_has_bank = false;
info!(
"vote bank: {:?} reset bank: {}",
vote_bank_slot,
bank.slot()
);
if !partition && vote_bank_slot != Some(bank.slot()) {
warn!(
"PARTITION DETECTED waiting to join fork: {} last vote: {:?}",
bank.slot(),
tower.last_vote()
);
inc_new_counter_info!("replay_stage-partition_detected", 1);
datapoint_info!(
"replay_stage-partition",
("slot", bank.slot() as i64, i64)
);
partition = true;
} else if partition && vote_bank_slot == Some(bank.slot()) {
warn!(
"PARTITION resolved fork: {} last vote: {:?}",
bank.slot(),
tower.last_vote()
);
partition = false;
inc_new_counter_info!("replay_stage-partition_resolved", 1);
}
} else {
done = true;
}
Self::report_memory(&allocated, "reset_bank", start);
if done {
break;
for slot in confirmed_forks {
progress
.get_mut(&slot)
.unwrap()
.fork_stats
.confirmation_reported = true;
}
}
let vote_bank = Self::select_fork(&frozen_banks, &tower, &progress);
Self::report_memory(&allocated, "select_fork", start);
if vote_bank.is_none() {
break;
}
let bank = vote_bank.unwrap();
let (is_locked_out, vote_threshold, fork_weight, total_staked) = {
let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats;
(
fork_stats.is_locked_out,
fork_stats.vote_threshold,
fork_stats.weight,
fork_stats.total_staked,
)
};
let mut vote_bank_slot = None;
let start = allocated.get();
if !is_locked_out && vote_threshold {
info!("voting: {} {}", bank.slot(), fork_weight);
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
{
Self::log_leader_change(
&my_pubkey,
bank.slot(),
&mut current_leader,
&votable_leader,
);
}
vote_bank_slot = Some(bank.slot());
Self::handle_votable_bank(
&bank,
&bank_forks,
&mut tower,
&mut progress,
&vote_account,
&voting_keypair,
&cluster_info,
&blockstore,
&leader_schedule_cache,
&root_bank_sender,
total_staked,
&lockouts_sender,
&snapshot_package_sender,
&latest_root_senders,
)?;
}
Self::report_memory(&allocated, "votable_bank", start);
let start = allocated.get();
if last_reset != bank.last_blockhash() {
Self::reset_poh_recorder(
&my_pubkey,
&blockstore,
&bank,
&poh_recorder,
&leader_schedule_cache,
);
last_reset = bank.last_blockhash();
tpu_has_bank = false;
info!(
"vote bank: {:?} reset bank: {}",
vote_bank_slot,
bank.slot()
);
if !partition && vote_bank_slot != Some(bank.slot()) {
warn!(
"PARTITION DETECTED waiting to join fork: {} last vote: {:?}",
bank.slot(),
tower.last_vote()
);
inc_new_counter_info!("replay_stage-partition_detected", 1);
datapoint_info!(
"replay_stage-partition",
("slot", bank.slot() as i64, i64)
);
partition = true;
} else if partition && vote_bank_slot == Some(bank.slot()) {
warn!(
"PARTITION resolved fork: {} last vote: {:?}",
bank.slot(),
tower.last_vote()
);
partition = false;
inc_new_counter_info!("replay_stage-partition_resolved", 1);
}
}
Self::report_memory(&allocated, "reset_bank", start);
let start = allocated.get();
if !tpu_has_bank {
Self::maybe_start_leader(