ReplayStage::new is too long

break into more functions
This commit is contained in:
Stephen Akridge 2019-03-21 11:53:18 -07:00 committed by sakridge
parent 3a7647f611
commit 412ebfcaf2
1 changed files with 168 additions and 112 deletions

View File

@ -93,31 +93,20 @@ impl ReplayStage {
if exit_.load(Ordering::Relaxed) {
break;
}
Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap());
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);
let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some();
for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
ticks_per_slot = bank.ticks_per_slot();
if bank.collector_id() != my_id {
Self::replay_blocktree_into_bank(
&bank,
&blocktree,
&mut progress,
&forward_entry_sender,
)?;
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
Self::process_completed_bank(
&my_id,
bank,
&mut progress,
&slot_full_sender,
);
}
}
Self::replay_active_banks(
&blocktree,
&bank_forks,
&my_id,
&mut ticks_per_slot,
&mut progress,
&forward_entry_sender,
&slot_full_sender,
)?;
if ticks_per_slot == 0 {
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
@ -125,100 +114,23 @@ impl ReplayStage {
ticks_per_slot = bank.ticks_per_slot();
}
let locktower_start = Instant::now();
// Locktower voting
let descendants = bank_forks.read().unwrap().descendants();
let ancestors = bank_forks.read().unwrap().ancestors();
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>)> = frozen_banks
.values()
.filter(|b| {
let is_votable = b.is_votable();
trace!("bank is votable: {} {}", b.slot(), is_votable);
is_votable
})
.filter(|b| {
let has_voted = locktower.has_voted(b.slot());
trace!("bank is has_voted: {} {}", b.slot(), has_voted);
!has_voted
})
.filter(|b| {
let is_locked_out = locktower.is_locked_out(b.slot(), &descendants);
trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out);
!is_locked_out
})
.map(|bank| {
(
bank,
locktower.collect_vote_lockouts(
bank.slot(),
bank.vote_accounts(),
&ancestors,
),
)
})
.filter(|(b, stake_lockouts)| {
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
.map(|(b, stake_lockouts)| {
(locktower.calculate_weight(&stake_lockouts), b.clone())
})
.collect();
votable.sort_by_key(|b| b.0);
trace!("votable_banks {}", votable.len());
let ms = timing::duration_as_ms(&locktower_start.elapsed());
if !votable.is_empty() {
let weights: Vec<u128> = votable.iter().map(|x| x.0).collect();
info!(
"@{:?} locktower duration: {:?} len: {} weights: {:?}",
timing::timestamp(),
ms,
votable.len(),
weights
);
}
inc_new_counter_info!("replay_stage-locktower_duration", ms as usize);
let votable = Self::generate_votable_banks(&bank_forks, &locktower);
if let Some((_, bank)) = votable.last() {
subscriptions.notify_subscribers(&bank);
if let Some(ref voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(
&vote_account,
keypair,
bank.slot(),
bank.last_blockhash(),
0,
);
if let Some(new_root) = locktower.record_vote(bank.slot()) {
bank_forks.write().unwrap().set_root(new_root);
Self::handle_new_root(&bank_forks, &mut progress);
}
locktower.update_epoch(&bank);
cluster_info.write().unwrap().push_vote(vote);
}
let next_leader_slot =
leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank);
poh_recorder.lock().unwrap().reset(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
next_leader_slot,
ticks_per_slot,
);
debug!(
"{:?} voted and reset poh at {}. next leader slot {:?}",
my_id,
bank.tick_height(),
next_leader_slot
Self::handle_votable_bank(
&bank,
&bank_forks,
&mut locktower,
&mut progress,
&voting_keypair,
&vote_account,
&cluster_info,
);
Self::reset_poh_recorder(&my_id, &bank, &poh_recorder, ticks_per_slot);
is_tpu_bank_active = false;
}
@ -354,6 +266,150 @@ impl ReplayStage {
Ok(())
}
fn handle_votable_bank<T>(
bank: &Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &mut Locktower,
progress: &mut HashMap<u64, (Hash, usize)>,
voting_keypair: &Option<Arc<T>>,
vote_account: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) where
T: 'static + KeypairUtil + Send + Sync,
{
if let Some(ref voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(
&vote_account,
keypair,
bank.slot(),
bank.last_blockhash(),
0,
);
if let Some(new_root) = locktower.record_vote(bank.slot()) {
bank_forks.write().unwrap().set_root(new_root);
Self::handle_new_root(&bank_forks, progress);
}
locktower.update_epoch(&bank);
cluster_info.write().unwrap().push_vote(vote);
}
}
fn reset_poh_recorder(
my_id: &Pubkey,
bank: &Arc<Bank>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
ticks_per_slot: u64,
) {
let next_leader_slot = leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank);
poh_recorder.lock().unwrap().reset(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
next_leader_slot,
ticks_per_slot,
);
debug!(
"{:?} voted and reset poh at {}. next leader slot {:?}",
my_id,
bank.tick_height(),
next_leader_slot
);
}
fn replay_active_banks(
blocktree: &Arc<Blocktree>,
bank_forks: &Arc<RwLock<BankForks>>,
my_id: &Pubkey,
ticks_per_slot: &mut u64,
progress: &mut HashMap<u64, (Hash, usize)>,
forward_entry_sender: &EntrySender,
slot_full_sender: &Sender<(u64, Pubkey)>,
) -> result::Result<()> {
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);
for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
*ticks_per_slot = bank.ticks_per_slot();
if bank.collector_id() != *my_id {
Self::replay_blocktree_into_bank(
&bank,
&blocktree,
progress,
&forward_entry_sender,
)?;
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
Self::process_completed_bank(my_id, bank, progress, slot_full_sender);
}
}
Ok(())
}
fn generate_votable_banks(
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &Locktower,
) -> Vec<(u128, Arc<Bank>)> {
let locktower_start = Instant::now();
// Locktower voting
let descendants = bank_forks.read().unwrap().descendants();
let ancestors = bank_forks.read().unwrap().ancestors();
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>)> = frozen_banks
.values()
.filter(|b| {
let is_votable = b.is_votable();
trace!("bank is votable: {} {}", b.slot(), is_votable);
is_votable
})
.filter(|b| {
let has_voted = locktower.has_voted(b.slot());
trace!("bank is has_voted: {} {}", b.slot(), has_voted);
!has_voted
})
.filter(|b| {
let is_locked_out = locktower.is_locked_out(b.slot(), &descendants);
trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out);
!is_locked_out
})
.map(|bank| {
(
bank,
locktower.collect_vote_lockouts(bank.slot(), bank.vote_accounts(), &ancestors),
)
})
.filter(|(b, stake_lockouts)| {
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
.map(|(b, stake_lockouts)| (locktower.calculate_weight(&stake_lockouts), b.clone()))
.collect();
votable.sort_by_key(|b| b.0);
let ms = timing::duration_as_ms(&locktower_start.elapsed());
trace!("votable_banks {}", votable.len());
if !votable.is_empty() {
let weights: Vec<u128> = votable.iter().map(|x| x.0).collect();
info!(
"@{:?} locktower duration: {:?} len: {} weights: {:?}",
timing::timestamp(),
ms,
votable.len(),
weights
);
}
inc_new_counter_info!("replay_stage-locktower_duration", ms as usize);
votable
}
pub fn load_blocktree_entries(
bank: &Bank,
blocktree: &Blocktree,