wen_restart: replace get_aggregate_result() with more methods (#254)

* Replace AggregateResult with more methods.

* Rename slots_to_repair() to slots_to_repair_iter().
This commit is contained in:
Wen 2024-03-14 19:01:17 -07:00 committed by GitHub
parent fba70c8504
commit f5a3f2476a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 50 additions and 43 deletions

View File

@ -22,11 +22,6 @@ pub struct LastVotedForkSlotsAggregate {
slots_to_repair: HashSet<Slot>,
}
pub struct LastVotedForkSlotsAggregateResult {
pub slots_to_repair: Vec<Slot>,
pub active_percent: f64, /* 0 ~ 100.0 */
}
impl LastVotedForkSlotsAggregate {
pub(crate) fn new(
root_slot: Slot,
@ -131,16 +126,16 @@ impl LastVotedForkSlotsAggregate {
Some(record)
}
pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult {
pub(crate) fn active_percent(&self) -> f64 {
let total_stake = self.epoch_stakes.total_stake();
let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| {
sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey))
});
let active_percent = total_active_stake as f64 / total_stake as f64 * 100.0;
LastVotedForkSlotsAggregateResult {
slots_to_repair: self.slots_to_repair.iter().cloned().collect(),
active_percent,
total_active_stake as f64 / total_stake as f64 * 100.0
}
pub(crate) fn slots_to_repair_iter(&self) -> impl Iterator<Item = &Slot> {
self.slots_to_repair.iter()
}
}
@ -237,11 +232,15 @@ mod tests {
}),
);
}
let result = test_state.slots_aggregate.get_aggregate_result();
let mut expected_active_percent =
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
assert!(result.slots_to_repair.is_empty());
assert_eq!(
test_state.slots_aggregate.active_percent(),
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0
);
assert!(test_state
.slots_aggregate
.slots_to_repair_iter()
.next()
.is_none());
let new_active_validator = test_state.validator_voting_keypairs
[initial_num_active_validators + 1]
@ -267,11 +266,14 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
expected_active_percent =
let expected_active_percent =
(initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, test_state.last_voted_fork_slots);
@ -299,9 +301,12 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);
@ -320,9 +325,12 @@ mod tests {
),
None,
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);
}
@ -339,8 +347,7 @@ mod tests {
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
};
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 10.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 10.0);
assert_eq!(
test_state
.slots_aggregate
@ -354,8 +361,7 @@ mod tests {
.unwrap(),
Some(record.clone()),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
// Now if you get the same result from Gossip again, it should be ignored.
assert_eq!(
test_state.slots_aggregate.aggregate(
@ -399,8 +405,7 @@ mod tests {
}),
);
// percentage doesn't change since it's a replace.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
// Record from validator with zero stake should be ignored.
assert_eq!(
@ -419,8 +424,7 @@ mod tests {
None,
);
// percentage doesn't change since the previous aggregate is ignored.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
}
#[test]

View File

@ -154,34 +154,37 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
.insert(from, record);
}
}
let result = last_voted_fork_slots_aggregate.get_aggregate_result();
// Because all operations on the aggregate are called from this single thread, we can
// fetch all results separately without worrying about them being out of sync. We can
// also use returned iterator without the vector changing underneath us.
let active_percent = last_voted_fork_slots_aggregate.active_percent();
let mut filtered_slots: Vec<Slot>;
{
let my_bank_forks = bank_forks.read().unwrap();
filtered_slots = result
.slots_to_repair
.into_iter()
filtered_slots = last_voted_fork_slots_aggregate
.slots_to_repair_iter()
.filter(|slot| {
if slot <= &root_slot || is_full_slots.contains(slot) {
if *slot <= &root_slot || is_full_slots.contains(*slot) {
return false;
}
let is_full = my_bank_forks
.get(*slot)
.get(**slot)
.map_or(false, |bank| bank.is_frozen());
if is_full {
is_full_slots.insert(*slot);
is_full_slots.insert(**slot);
}
!is_full
})
.cloned()
.collect();
}
filtered_slots.sort();
info!(
"Active peers: {} Slots to repair: {:?}",
result.active_percent, &filtered_slots
active_percent, &filtered_slots
);
if filtered_slots.is_empty()
&& result.active_percent > wait_for_supermajority_threshold_percent as f64
&& active_percent > wait_for_supermajority_threshold_percent as f64
{
*wen_restart_repair_slots.write().unwrap() = vec![];
break;