parent
c5b2db72a2
commit
0139236464
|
@ -358,11 +358,12 @@ impl Tower {
|
||||||
|
|
||||||
pub(crate) fn check_switch_threshold(
|
pub(crate) fn check_switch_threshold(
|
||||||
&self,
|
&self,
|
||||||
_slot: u64,
|
_slot: Slot,
|
||||||
_ancestors: &HashMap<Slot, HashSet<u64>>,
|
_ancestors: &HashMap<Slot, HashSet<u64>>,
|
||||||
_descendants: &HashMap<Slot, HashSet<u64>>,
|
_descendants: &HashMap<Slot, HashSet<u64>>,
|
||||||
_progress: &ProgressMap,
|
_progress: &ProgressMap,
|
||||||
_total_stake: u64,
|
_total_epoch_stake: u64,
|
||||||
|
_epoch_vote_accounts: &HashMap<Pubkey, (u64, Account)>,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
@ -647,7 +648,6 @@ pub mod test {
|
||||||
bank_forks,
|
bank_forks,
|
||||||
progress,
|
progress,
|
||||||
&None,
|
&None,
|
||||||
&mut 0,
|
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,11 +177,6 @@ impl ReplayStage {
|
||||||
let mut current_leader = None;
|
let mut current_leader = None;
|
||||||
let mut last_reset = Hash::default();
|
let mut last_reset = Hash::default();
|
||||||
let mut partition = false;
|
let mut partition = false;
|
||||||
let mut earliest_vote_on_fork = {
|
|
||||||
let slots = tower.last_vote().slots;
|
|
||||||
slots.last().cloned().unwrap_or(0)
|
|
||||||
};
|
|
||||||
let mut switch_threshold = false;
|
|
||||||
let mut skipped_slots_info = SkippedSlotsInfo::default();
|
let mut skipped_slots_info = SkippedSlotsInfo::default();
|
||||||
loop {
|
loop {
|
||||||
let allocated = thread_mem_usage::Allocatedp::default();
|
let allocated = thread_mem_usage::Allocatedp::default();
|
||||||
|
@ -220,7 +215,7 @@ impl ReplayStage {
|
||||||
Self::report_memory(&allocated, "replay_active_banks", start);
|
Self::report_memory(&allocated, "replay_active_banks", start);
|
||||||
|
|
||||||
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
|
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
|
||||||
let descendants = Arc::new(HashMap::new());
|
let descendants = HashMap::new();
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
let mut frozen_banks: Vec<_> = bank_forks
|
let mut frozen_banks: Vec<_> = bank_forks
|
||||||
.read()
|
.read()
|
||||||
|
@ -259,7 +254,7 @@ impl ReplayStage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (heaviest_bank, votable_bank_on_same_fork) =
|
let (heaviest_bank, heaviest_bank_on_same_fork) =
|
||||||
Self::select_forks(&frozen_banks, &tower, &progress, &ancestors);
|
Self::select_forks(&frozen_banks, &tower, &progress, &ancestors);
|
||||||
|
|
||||||
Self::report_memory(&allocated, "select_fork", start);
|
Self::report_memory(&allocated, "select_fork", start);
|
||||||
|
@ -267,9 +262,7 @@ impl ReplayStage {
|
||||||
let (vote_bank, reset_bank, failure_reasons) =
|
let (vote_bank, reset_bank, failure_reasons) =
|
||||||
Self::select_vote_and_reset_forks(
|
Self::select_vote_and_reset_forks(
|
||||||
&heaviest_bank,
|
&heaviest_bank,
|
||||||
&votable_bank_on_same_fork,
|
&heaviest_bank_on_same_fork,
|
||||||
earliest_vote_on_fork,
|
|
||||||
&mut switch_threshold,
|
|
||||||
&ancestors,
|
&ancestors,
|
||||||
&descendants,
|
&descendants,
|
||||||
&progress,
|
&progress,
|
||||||
|
@ -311,46 +304,36 @@ impl ReplayStage {
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
|
|
||||||
// Vote on a fork
|
// Vote on a fork
|
||||||
let voted_on_different_fork = {
|
if let Some(ref vote_bank) = vote_bank {
|
||||||
if let Some(ref vote_bank) = vote_bank {
|
subscriptions.notify_subscribers(block_commitment_cache.read().unwrap().slot(), &bank_forks);
|
||||||
subscriptions.notify_subscribers(block_commitment_cache.read().unwrap().slot(), &bank_forks);
|
if let Some(votable_leader) = leader_schedule_cache
|
||||||
if let Some(votable_leader) = leader_schedule_cache
|
.slot_leader_at(vote_bank.slot(), Some(vote_bank))
|
||||||
.slot_leader_at(vote_bank.slot(), Some(vote_bank))
|
{
|
||||||
{
|
Self::log_leader_change(
|
||||||
Self::log_leader_change(
|
&my_pubkey,
|
||||||
&my_pubkey,
|
vote_bank.slot(),
|
||||||
vote_bank.slot(),
|
&mut current_leader,
|
||||||
&mut current_leader,
|
&votable_leader,
|
||||||
&votable_leader,
|
);
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Self::handle_votable_bank(
|
|
||||||
&vote_bank,
|
|
||||||
&bank_forks,
|
|
||||||
&mut tower,
|
|
||||||
&mut progress,
|
|
||||||
&vote_account,
|
|
||||||
&authorized_voter_keypairs,
|
|
||||||
&cluster_info,
|
|
||||||
&blockstore,
|
|
||||||
&leader_schedule_cache,
|
|
||||||
&root_bank_sender,
|
|
||||||
&lockouts_sender,
|
|
||||||
&accounts_hash_sender,
|
|
||||||
&latest_root_senders,
|
|
||||||
&mut earliest_vote_on_fork,
|
|
||||||
&mut all_pubkeys,
|
|
||||||
&subscriptions,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
ancestors
|
|
||||||
.get(&vote_bank.slot())
|
|
||||||
.unwrap()
|
|
||||||
.contains(&earliest_vote_on_fork)
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Self::handle_votable_bank(
|
||||||
|
&vote_bank,
|
||||||
|
&bank_forks,
|
||||||
|
&mut tower,
|
||||||
|
&mut progress,
|
||||||
|
&vote_account,
|
||||||
|
&authorized_voter_keypairs,
|
||||||
|
&cluster_info,
|
||||||
|
&blockstore,
|
||||||
|
&leader_schedule_cache,
|
||||||
|
&root_bank_sender,
|
||||||
|
&lockouts_sender,
|
||||||
|
&accounts_hash_sender,
|
||||||
|
&latest_root_senders,
|
||||||
|
&mut all_pubkeys,
|
||||||
|
&subscriptions,
|
||||||
|
)?;
|
||||||
};
|
};
|
||||||
|
|
||||||
Self::report_memory(&allocated, "votable_bank", start);
|
Self::report_memory(&allocated, "votable_bank", start);
|
||||||
|
@ -358,12 +341,7 @@ impl ReplayStage {
|
||||||
|
|
||||||
// Reset onto a fork
|
// Reset onto a fork
|
||||||
if let Some(reset_bank) = reset_bank {
|
if let Some(reset_bank) = reset_bank {
|
||||||
let selected_same_fork = ancestors
|
|
||||||
.get(&reset_bank.slot())
|
|
||||||
.unwrap()
|
|
||||||
.contains(&earliest_vote_on_fork);
|
|
||||||
if last_reset != reset_bank.last_blockhash()
|
if last_reset != reset_bank.last_blockhash()
|
||||||
&& (selected_same_fork || switch_threshold)
|
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
"vote bank: {:?} reset bank: {:?}",
|
"vote bank: {:?} reset bank: {:?}",
|
||||||
|
@ -413,17 +391,6 @@ impl ReplayStage {
|
||||||
}
|
}
|
||||||
Self::report_memory(&allocated, "reset_bank", start);
|
Self::report_memory(&allocated, "reset_bank", start);
|
||||||
|
|
||||||
// If we voted on a different fork, update the earliest vote
|
|
||||||
// to this slot, clear the switch threshold
|
|
||||||
if voted_on_different_fork {
|
|
||||||
earliest_vote_on_fork = vote_bank
|
|
||||||
.expect("voted_on_different_fork only set if vote_bank.is_some()")
|
|
||||||
.slot();
|
|
||||||
// Clear the thresholds after voting on different
|
|
||||||
// fork
|
|
||||||
switch_threshold = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
if !tpu_has_bank {
|
if !tpu_has_bank {
|
||||||
Self::maybe_start_leader(
|
Self::maybe_start_leader(
|
||||||
|
@ -718,7 +685,6 @@ impl ReplayStage {
|
||||||
lockouts_sender: &Sender<CommitmentAggregationData>,
|
lockouts_sender: &Sender<CommitmentAggregationData>,
|
||||||
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
||||||
latest_root_senders: &[Sender<Slot>],
|
latest_root_senders: &[Sender<Slot>],
|
||||||
earliest_vote_on_fork: &mut Slot,
|
|
||||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
subscriptions: &Arc<RpcSubscriptions>,
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
@ -751,7 +717,6 @@ impl ReplayStage {
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
progress,
|
progress,
|
||||||
accounts_hash_sender,
|
accounts_hash_sender,
|
||||||
earliest_vote_on_fork,
|
|
||||||
all_pubkeys,
|
all_pubkeys,
|
||||||
);
|
);
|
||||||
subscriptions.notify_roots(rooted_slots);
|
subscriptions.notify_roots(rooted_slots);
|
||||||
|
@ -1144,7 +1109,8 @@ impl ReplayStage {
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
let last_vote = tower.last_vote().slots.last().cloned();
|
let last_vote = tower.last_vote().slots.last().cloned();
|
||||||
let mut last_votable_on_same_fork = None;
|
let mut heaviest_bank_on_same_fork = None;
|
||||||
|
let mut heaviest_same_fork_weight = 0;
|
||||||
let stats: Vec<&ForkStats> = frozen_banks
|
let stats: Vec<&ForkStats> = frozen_banks
|
||||||
.iter()
|
.iter()
|
||||||
.map(|bank| {
|
.map(|bank| {
|
||||||
|
@ -1160,15 +1126,20 @@ impl ReplayStage {
|
||||||
.get(&bank.slot())
|
.get(&bank.slot())
|
||||||
.expect("Entry in frozen banks must exist in ancestors")
|
.expect("Entry in frozen banks must exist in ancestors")
|
||||||
.contains(&last_vote)
|
.contains(&last_vote)
|
||||||
&& stats.vote_threshold
|
|
||||||
{
|
{
|
||||||
// Descendant of last vote cannot be locked out
|
// Descendant of last vote cannot be locked out
|
||||||
assert!(!stats.is_locked_out);
|
assert!(!stats.is_locked_out);
|
||||||
|
|
||||||
// ancestors(slot) should not contain the slot itself,
|
// ancestors(slot) should not contain the slot itself,
|
||||||
// so we shouldd never get the same bank as the last vote
|
// so we should never get the same bank as the last vote
|
||||||
assert_ne!(bank.slot(), last_vote);
|
assert_ne!(bank.slot(), last_vote);
|
||||||
last_votable_on_same_fork = Some(bank.clone());
|
// highest weight, lowest slot first. frozen_banks is sorted
|
||||||
|
// from least slot to greatest slot, so if two banks have
|
||||||
|
// the same fork weight, the lower slot will be picked
|
||||||
|
if stats.fork_weight > heaviest_same_fork_weight {
|
||||||
|
heaviest_bank_on_same_fork = Some(bank.clone());
|
||||||
|
heaviest_same_fork_weight = stats.fork_weight;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1219,17 +1190,15 @@ impl ReplayStage {
|
||||||
("tower_duration", ms as i64, i64),
|
("tower_duration", ms as i64, i64),
|
||||||
);
|
);
|
||||||
|
|
||||||
(rv.map(|x| x.0.clone()), last_votable_on_same_fork)
|
(rv.map(|x| x.0.clone()), heaviest_bank_on_same_fork)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Given a heaviest bank, `heaviest_bank` and the next votable bank
|
// Given a heaviest bank, `heaviest_bank` and the next votable bank
|
||||||
// `votable_bank_on_same_fork` as the validator's last vote, return
|
// `heaviest_bank_on_same_fork` as the validator's last vote, return
|
||||||
// a bank to vote on, a bank to reset to,
|
// a bank to vote on, a bank to reset to,
|
||||||
pub(crate) fn select_vote_and_reset_forks(
|
pub(crate) fn select_vote_and_reset_forks(
|
||||||
heaviest_bank: &Option<Arc<Bank>>,
|
heaviest_bank: &Option<Arc<Bank>>,
|
||||||
votable_bank_on_same_fork: &Option<Arc<Bank>>,
|
heaviest_bank_on_same_fork: &Option<Arc<Bank>>,
|
||||||
earliest_vote_on_fork: u64,
|
|
||||||
switch_threshold: &mut bool,
|
|
||||||
ancestors: &HashMap<u64, HashSet<u64>>,
|
ancestors: &HashMap<u64, HashSet<u64>>,
|
||||||
descendants: &HashMap<u64, HashSet<u64>>,
|
descendants: &HashMap<u64, HashSet<u64>>,
|
||||||
progress: &ProgressMap,
|
progress: &ProgressMap,
|
||||||
|
@ -1255,52 +1224,40 @@ impl ReplayStage {
|
||||||
let mut failure_reasons = vec![];
|
let mut failure_reasons = vec![];
|
||||||
let selected_fork = {
|
let selected_fork = {
|
||||||
if let Some(bank) = heaviest_bank {
|
if let Some(bank) = heaviest_bank {
|
||||||
let selected_same_fork = ancestors
|
let switch_threshold = tower.check_switch_threshold(
|
||||||
.get(&bank.slot())
|
bank.slot(),
|
||||||
.unwrap()
|
&ancestors,
|
||||||
.contains(&earliest_vote_on_fork);
|
&descendants,
|
||||||
if selected_same_fork {
|
&progress,
|
||||||
// If the heaviest bank is on the same fork as the last
|
bank.total_epoch_stake(),
|
||||||
// vote, then there's no need to check the switch threshold.
|
bank.epoch_vote_accounts(bank.epoch()).expect(
|
||||||
// Just vote for the latest votable bank on the same fork,
|
"Bank epoch vote accounts must contain entry for the bank's own epoch",
|
||||||
// which is `votable_bank_on_same_fork`.
|
),
|
||||||
votable_bank_on_same_fork
|
);
|
||||||
|
if !switch_threshold {
|
||||||
|
// If we can't switch, then reset to the the next votable
|
||||||
|
// bank on the same fork as our last vote, but don't vote
|
||||||
|
info!(
|
||||||
|
"Waiting to switch to {}, voting on {:?} on same fork for now",
|
||||||
|
bank.slot(),
|
||||||
|
heaviest_bank_on_same_fork.as_ref().map(|b| b.slot())
|
||||||
|
);
|
||||||
|
failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot()));
|
||||||
|
heaviest_bank_on_same_fork
|
||||||
|
.as_ref()
|
||||||
|
.map(|b| (b, switch_threshold))
|
||||||
} else {
|
} else {
|
||||||
if !*switch_threshold {
|
// If the switch threshold is observed, halt voting on
|
||||||
let total_staked =
|
// the current fork and attempt to vote/reset Poh to
|
||||||
progress.get_fork_stats(bank.slot()).unwrap().total_staked;
|
// the heaviest bank
|
||||||
*switch_threshold = tower.check_switch_threshold(
|
heaviest_bank.as_ref().map(|b| (b, switch_threshold))
|
||||||
earliest_vote_on_fork,
|
|
||||||
&ancestors,
|
|
||||||
&descendants,
|
|
||||||
&progress,
|
|
||||||
total_staked,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if !*switch_threshold {
|
|
||||||
// If we can't switch, then vote on the the next votable
|
|
||||||
// bank on the same fork as our last vote
|
|
||||||
info!(
|
|
||||||
"Waiting to switch to {}, voting on {:?} on same fork for now",
|
|
||||||
bank.slot(),
|
|
||||||
votable_bank_on_same_fork.as_ref().map(|b| b.slot())
|
|
||||||
);
|
|
||||||
failure_reasons
|
|
||||||
.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot()));
|
|
||||||
votable_bank_on_same_fork
|
|
||||||
} else {
|
|
||||||
// If the switch threshold is observed, halt voting on
|
|
||||||
// the current fork and attempt to vote/reset Poh/switch to
|
|
||||||
// theh heaviest bank
|
|
||||||
heaviest_bank
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
&None
|
None
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(bank) = selected_fork {
|
if let Some((bank, switch_threshold)) = selected_fork {
|
||||||
let (is_locked_out, vote_threshold, is_leader_slot, fork_weight) = {
|
let (is_locked_out, vote_threshold, is_leader_slot, fork_weight) = {
|
||||||
let fork_stats = progress.get_fork_stats(bank.slot()).unwrap();
|
let fork_stats = progress.get_fork_stats(bank.slot()).unwrap();
|
||||||
let propagated_stats = &progress.get_propagated_stats(bank.slot()).unwrap();
|
let propagated_stats = &progress.get_propagated_stats(bank.slot()).unwrap();
|
||||||
|
@ -1323,16 +1280,15 @@ impl ReplayStage {
|
||||||
if !propagation_confirmed {
|
if !propagation_confirmed {
|
||||||
failure_reasons.push(HeaviestForkFailures::NoPropagatedConfirmation(bank.slot()));
|
failure_reasons.push(HeaviestForkFailures::NoPropagatedConfirmation(bank.slot()));
|
||||||
}
|
}
|
||||||
|
if !switch_threshold {
|
||||||
|
failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot()));
|
||||||
|
}
|
||||||
|
|
||||||
if !is_locked_out && vote_threshold && propagation_confirmed {
|
if !is_locked_out && vote_threshold && propagation_confirmed && switch_threshold {
|
||||||
info!("voting: {} {}", bank.slot(), fork_weight);
|
info!("voting: {} {}", bank.slot(), fork_weight);
|
||||||
(
|
(Some(bank.clone()), Some(bank.clone()), failure_reasons)
|
||||||
selected_fork.clone(),
|
|
||||||
selected_fork.clone(),
|
|
||||||
failure_reasons,
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
(None, selected_fork.clone(), failure_reasons)
|
(None, Some(bank.clone()), failure_reasons)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
(None, None, failure_reasons)
|
(None, None, failure_reasons)
|
||||||
|
@ -1508,7 +1464,6 @@ impl ReplayStage {
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
||||||
earliest_vote_on_fork: &mut u64,
|
|
||||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
) {
|
) {
|
||||||
let old_epoch = bank_forks.read().unwrap().root_bank().epoch();
|
let old_epoch = bank_forks.read().unwrap().root_bank().epoch();
|
||||||
|
@ -1521,7 +1476,6 @@ impl ReplayStage {
|
||||||
if old_epoch != new_epoch {
|
if old_epoch != new_epoch {
|
||||||
all_pubkeys.retain(|x| Rc::strong_count(x) > 1);
|
all_pubkeys.retain(|x| Rc::strong_count(x) > 1);
|
||||||
}
|
}
|
||||||
*earliest_vote_on_fork = std::cmp::max(new_root, *earliest_vote_on_fork);
|
|
||||||
progress.handle_new_root(&r_bank_forks);
|
progress.handle_new_root(&r_bank_forks);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2108,30 +2062,10 @@ pub(crate) mod tests {
|
||||||
for i in 0..=root {
|
for i in 0..=root {
|
||||||
progress.insert(i, ForkProgress::new(Hash::default(), None, None));
|
progress.insert(i, ForkProgress::new(Hash::default(), None, None));
|
||||||
}
|
}
|
||||||
let mut earliest_vote_on_fork = root - 1;
|
ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None, &mut HashSet::new());
|
||||||
ReplayStage::handle_new_root(
|
|
||||||
root,
|
|
||||||
&bank_forks,
|
|
||||||
&mut progress,
|
|
||||||
&None,
|
|
||||||
&mut earliest_vote_on_fork,
|
|
||||||
&mut HashSet::new(),
|
|
||||||
);
|
|
||||||
assert_eq!(bank_forks.read().unwrap().root(), root);
|
assert_eq!(bank_forks.read().unwrap().root(), root);
|
||||||
assert_eq!(progress.len(), 1);
|
assert_eq!(progress.len(), 1);
|
||||||
assert_eq!(earliest_vote_on_fork, root);
|
|
||||||
assert!(progress.get(&root).is_some());
|
assert!(progress.get(&root).is_some());
|
||||||
|
|
||||||
earliest_vote_on_fork = root + 1;
|
|
||||||
ReplayStage::handle_new_root(
|
|
||||||
root,
|
|
||||||
&bank_forks,
|
|
||||||
&mut progress,
|
|
||||||
&None,
|
|
||||||
&mut earliest_vote_on_fork,
|
|
||||||
&mut HashSet::new(),
|
|
||||||
);
|
|
||||||
assert_eq!(earliest_vote_on_fork, root + 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue