retransmit consecutive leader blocks (#22157)

This commit is contained in:
Jeff Biseda 2022-01-04 00:24:16 -08:00 committed by GitHub
parent 2b5e00d36d
commit ca8fef5855
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 232 additions and 114 deletions

View File

@ -437,6 +437,11 @@ impl ProgressMap {
.map(|fork_progress| &mut fork_progress.propagated_stats)
}
pub fn get_propagated_stats_must_exist(&self, slot: Slot) -> &PropagatedStats {
self.get_propagated_stats(slot)
.unwrap_or_else(|| panic!("slot={} must exist in ProgressMap", slot))
}
pub fn get_fork_stats(&self, slot: Slot) -> Option<&ForkStats> {
self.progress_map
.get(&slot)
@ -449,7 +454,13 @@ impl ProgressMap {
.map(|fork_progress| &mut fork_progress.fork_stats)
}
pub fn get_retransmit_info(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> {
pub fn get_retransmit_info(&self, slot: Slot) -> Option<&RetransmitInfo> {
self.progress_map
.get(&slot)
.map(|fork_progress| &fork_progress.retransmit_info)
}
pub fn get_retransmit_info_mut(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> {
self.progress_map
.get_mut(&slot)
.map(|fork_progress| &mut fork_progress.retransmit_info)
@ -467,30 +478,13 @@ impl ProgressMap {
.and_then(|fork_progress| fork_progress.fork_stats.bank_hash)
}
pub fn is_propagated(&self, slot: Slot) -> bool {
let leader_slot_to_check = self.get_latest_leader_slot(slot);
// prev_leader_slot doesn't exist because already rooted
// or this validator hasn't been scheduled as a leader
// yet. In both cases the latest leader is vacuously
// confirmed
leader_slot_to_check
.map(|leader_slot_to_check| {
// If the leader's stats are None (isn't in the
// progress map), this means that prev_leader slot is
// rooted, so return true
self.get_propagated_stats(leader_slot_to_check)
.map(|stats| stats.is_propagated)
.unwrap_or(true)
})
.unwrap_or(true)
pub fn is_propagated(&self, slot: Slot) -> Option<bool> {
self.get_propagated_stats(slot)
.map(|stats| stats.is_propagated)
}
pub fn get_latest_leader_slot(&self, slot: Slot) -> Option<Slot> {
let propagated_stats = self
.get_propagated_stats(slot)
.expect("All frozen banks must exist in the Progress map");
pub fn get_latest_leader_slot_must_exist(&self, slot: Slot) -> Option<Slot> {
let propagated_stats = self.get_propagated_stats_must_exist(slot);
if propagated_stats.is_leader_slot {
Some(slot)
} else {
@ -498,6 +492,24 @@ impl ProgressMap {
}
}
pub fn get_leader_propagation_slot_must_exist(&self, slot: Slot) -> (bool, Option<Slot>) {
if let Some(leader_slot) = self.get_latest_leader_slot_must_exist(slot) {
// If the leader's stats are None (isn't in the
// progress map), this means that prev_leader slot is
// rooted, so return true
(
self.is_propagated(leader_slot).unwrap_or(true),
Some(leader_slot),
)
} else {
// prev_leader_slot doesn't exist because already rooted
// or this validator hasn't been scheduled as a leader
// yet. In both cases the latest leader is vacuously
// confirmed
(true, None)
}
}
pub fn my_latest_landed_vote(&self, slot: Slot) -> Option<Slot> {
self.progress_map
.get(&slot)
@ -723,27 +735,27 @@ mod test {
);
// None of these slot have parents which are confirmed
assert!(!progress_map.is_propagated(9));
assert!(!progress_map.is_propagated(10));
assert!(!progress_map.get_leader_propagation_slot_must_exist(9).0);
assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0);
// Insert new ForkProgress for slot 8 with no previous leader.
// The previous leader before 8, slot 7, does not exist in
// progress map, so is_propagated(8) should return true as
// this implies the parent is rooted
progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None, 0, 0));
assert!(progress_map.is_propagated(8));
assert!(progress_map.get_leader_propagation_slot_must_exist(8).0);
// If we set the is_propagated = true, is_propagated should return true
progress_map
.get_propagated_stats_mut(9)
.unwrap()
.is_propagated = true;
assert!(progress_map.is_propagated(9));
assert!(progress_map.get_leader_propagation_slot_must_exist(9).0);
assert!(progress_map.get(&9).unwrap().propagated_stats.is_propagated);
// Because slot 9 is now confirmed, then slot 10 is also confirmed b/c 9
// is the last leader slot before 10
assert!(progress_map.is_propagated(10));
assert!(progress_map.get_leader_propagation_slot_must_exist(10).0);
// If we make slot 10 a leader slot though, even though its previous
// leader slot 9 has been confirmed, slot 10 itself is not confirmed
@ -751,6 +763,6 @@ mod test {
.get_propagated_stats_mut(10)
.unwrap()
.is_leader_slot = true;
assert!(!progress_map.is_propagated(10));
assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0);
}
}

View File

@ -35,6 +35,7 @@ use {
blockstore::Blockstore,
blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender},
leader_schedule_cache::LeaderScheduleCache,
leader_schedule_utils::first_of_consecutive_leader_slots,
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_info,
@ -606,7 +607,7 @@ impl ReplayStage {
for r in heaviest_fork_failures {
if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r {
if let Some(latest_leader_slot) =
progress.get_latest_leader_slot(slot)
progress.get_latest_leader_slot_must_exist(slot)
{
progress.log_propagated_stats(latest_leader_slot, &bank_forks);
}
@ -850,36 +851,66 @@ impl ReplayStage {
}
}
fn maybe_retransmit_unpropagated_slots(
metric_name: &'static str,
retransmit_slots_sender: &RetransmitSlotsSender,
progress: &mut ProgressMap,
latest_leader_slot: Slot,
) {
let first_leader_group_slot = first_of_consecutive_leader_slots(latest_leader_slot);
for slot in first_leader_group_slot..=latest_leader_slot {
let is_propagated = progress.is_propagated(slot);
if let Some(retransmit_info) = progress.get_retransmit_info_mut(slot) {
if !is_propagated.expect(
"presence of retransmit_info ensures that propagation status is present",
) {
if retransmit_info.reached_retransmit_threshold() {
info!(
"Retrying retransmit: latest_leader_slot={} slot={} retransmit_info={:?}",
latest_leader_slot,
slot,
&retransmit_info,
);
datapoint_info!(
metric_name,
("latest_leader_slot", latest_leader_slot, i64),
("slot", slot, i64),
("retry_iteration", retransmit_info.retry_iteration, i64),
);
let _ = retransmit_slots_sender.send(slot);
retransmit_info.increment_retry_iteration();
} else {
debug!(
"Bypass retransmit of slot={} retransmit_info={:?}",
slot, &retransmit_info
);
}
}
}
}
}
fn retransmit_latest_unpropagated_leader_slot(
poh_recorder: &Arc<Mutex<PohRecorder>>,
retransmit_slots_sender: &RetransmitSlotsSender,
progress: &mut ProgressMap,
) {
let start_slot = poh_recorder.lock().unwrap().start_slot();
if let Some(latest_leader_slot) = progress.get_latest_leader_slot(start_slot) {
if !progress
.get_propagated_stats(latest_leader_slot)
.map(|stats| stats.is_propagated)
.unwrap_or(true)
{
warn!("Slot not propagated: slot={}", latest_leader_slot);
let retransmit_info = progress.get_retransmit_info(latest_leader_slot).unwrap();
if retransmit_info.reached_retransmit_threshold() {
info!(
"Retrying retransmit: start_slot={} latest_leader_slot={} retransmit_info={:?}",
start_slot, latest_leader_slot, &retransmit_info,
);
datapoint_info!(
"replay_stage-retransmit-timing-based",
("slot", latest_leader_slot, i64),
("retry_iteration", retransmit_info.retry_iteration, i64),
);
let _ = retransmit_slots_sender.send(latest_leader_slot);
retransmit_info.increment_retry_iteration();
} else {
info!("Bypass retry: {:?}", &retransmit_info);
}
}
if let (false, Some(latest_leader_slot)) =
progress.get_leader_propagation_slot_must_exist(start_slot)
{
debug!(
"Slot not propagated: start_slot={} latest_leader_slot={}",
start_slot, latest_leader_slot
);
Self::maybe_retransmit_unpropagated_slots(
"replay_stage-retransmit-timing-based",
retransmit_slots_sender,
progress,
latest_leader_slot,
);
}
}
@ -1372,7 +1403,9 @@ impl ReplayStage {
// `poh_slot` and `parent_slot`, because they're in the same
// `NUM_CONSECUTIVE_LEADER_SLOTS` block, we still skip the propagated
// check because it's still within the propagation grace period.
if let Some(latest_leader_slot) = progress_map.get_latest_leader_slot(parent_slot) {
if let Some(latest_leader_slot) =
progress_map.get_latest_leader_slot_must_exist(parent_slot)
{
let skip_propagated_check =
poh_slot - latest_leader_slot < NUM_CONSECUTIVE_LEADER_SLOTS;
if skip_propagated_check {
@ -1384,7 +1417,9 @@ impl ReplayStage {
// propagation of `parent_slot`, it checks propagation of the latest ancestor
// of `parent_slot` (hence the call to `get_latest_leader_slot()` in the
// check above)
progress_map.is_propagated(parent_slot)
progress_map
.get_leader_propagation_slot_must_exist(parent_slot)
.0
}
fn should_retransmit(poh_slot: Slot, last_retransmit_slot: &mut Slot) -> bool {
@ -1469,7 +1504,7 @@ impl ReplayStage {
);
if !Self::check_propagation_for_start_leader(poh_slot, parent_slot, progress_map) {
let latest_unconfirmed_leader_slot = progress_map.get_latest_leader_slot(parent_slot)
let latest_unconfirmed_leader_slot = progress_map.get_latest_leader_slot_must_exist(parent_slot)
.expect("In order for propagated check to fail, latest leader must exist in progress map");
if poh_slot != skipped_slots_info.last_skipped_slot {
datapoint_info!(
@ -1485,29 +1520,13 @@ impl ReplayStage {
progress_map.log_propagated_stats(latest_unconfirmed_leader_slot, bank_forks);
skipped_slots_info.last_skipped_slot = poh_slot;
}
if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) {
let retransmit_info = progress_map
.get_retransmit_info(latest_unconfirmed_leader_slot)
.unwrap();
if retransmit_info.reached_retransmit_threshold() {
info!(
"Retrying retransmit: retransmit_info={:?}",
&retransmit_info
);
datapoint_info!(
"replay_stage-retransmit",
("slot", latest_unconfirmed_leader_slot, i64),
("retry_iteration", retransmit_info.retry_iteration, i64),
);
let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot);
retransmit_info.increment_retry_iteration();
} else {
info!(
"Bypassing retransmit of my leader slot retransmit_info={:?}",
&retransmit_info
);
}
Self::maybe_retransmit_unpropagated_slots(
"replay_stage-retransmit",
retransmit_slots_sender,
progress_map,
latest_unconfirmed_leader_slot,
);
}
return;
}
@ -2345,38 +2364,24 @@ impl ReplayStage {
cluster_slots: &ClusterSlots,
) {
// If propagation has already been confirmed, return
if progress.is_propagated(slot) {
if progress.get_leader_propagation_slot_must_exist(slot).0 {
return;
}
// Otherwise we have to check the votes for confirmation
let mut slot_vote_tracker = progress
.get_propagated_stats(slot)
.expect("All frozen banks must exist in the Progress map")
.slot_vote_tracker
.clone();
let mut propagated_stats = progress
.get_propagated_stats_mut(slot)
.unwrap_or_else(|| panic!("slot={} must exist in ProgressMap", slot));
if slot_vote_tracker.is_none() {
slot_vote_tracker = vote_tracker.get_slot_vote_tracker(slot);
progress
.get_propagated_stats_mut(slot)
.expect("All frozen banks must exist in the Progress map")
.slot_vote_tracker = slot_vote_tracker.clone();
if propagated_stats.slot_vote_tracker.is_none() {
propagated_stats.slot_vote_tracker = vote_tracker.get_slot_vote_tracker(slot);
}
let slot_vote_tracker = propagated_stats.slot_vote_tracker.clone();
let mut cluster_slot_pubkeys = progress
.get_propagated_stats(slot)
.expect("All frozen banks must exist in the Progress map")
.cluster_slot_pubkeys
.clone();
if cluster_slot_pubkeys.is_none() {
cluster_slot_pubkeys = cluster_slots.lookup(slot);
progress
.get_propagated_stats_mut(slot)
.expect("All frozen banks must exist in the Progress map")
.cluster_slot_pubkeys = cluster_slot_pubkeys.clone();
if propagated_stats.cluster_slot_pubkeys.is_none() {
propagated_stats.cluster_slot_pubkeys = cluster_slots.lookup(slot);
}
let cluster_slot_pubkeys = propagated_stats.cluster_slot_pubkeys.clone();
let newly_voted_pubkeys = slot_vote_tracker
.as_ref()
@ -2538,7 +2543,10 @@ impl ReplayStage {
)
};
let propagation_confirmed = is_leader_slot || progress.is_propagated(bank.slot());
let propagation_confirmed = is_leader_slot
|| progress
.get_leader_propagation_slot_must_exist(bank.slot())
.0;
if is_locked_out {
failure_reasons.push(HeaviestForkFailures::LockedOut(bank.slot()));
@ -2584,7 +2592,7 @@ impl ReplayStage {
fork_tip: Slot,
bank_forks: &RwLock<BankForks>,
) {
let mut current_leader_slot = progress.get_latest_leader_slot(fork_tip);
let mut current_leader_slot = progress.get_latest_leader_slot_must_exist(fork_tip);
let mut did_newly_reach_threshold = false;
let root = bank_forks.read().unwrap().root();
loop {
@ -2951,6 +2959,7 @@ pub mod tests {
use {
super::*,
crate::{
broadcast_stage::RetransmitSlotsReceiver,
consensus::Tower,
progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
replay_stage::ReplayStage,
@ -4388,7 +4397,7 @@ pub mod tests {
// Make sure is_propagated == false so that the propagation logic
// runs in `update_propagation_status`
assert!(!progress_map.is_propagated(10));
assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0);
let vote_tracker = VoteTracker::new(&bank_forks.root_bank());
vote_tracker.insert_vote(10, vote_pubkey);
@ -5015,7 +5024,11 @@ pub mod tests {
vote_tracker.insert_vote(root_bank.slot(), *vote_key);
}
assert!(!progress.is_propagated(root_bank.slot()));
assert!(
!progress
.get_leader_propagation_slot_must_exist(root_bank.slot())
.0
);
// Update propagation status
let tower = Tower::new_for_tests(0, 0.67);
@ -5033,7 +5046,11 @@ pub mod tests {
);
// Check status is true
assert!(progress.is_propagated(root_bank.slot()));
assert!(
progress
.get_leader_propagation_slot_must_exist(root_bank.slot())
.0
);
}
#[test]
@ -5984,7 +6001,7 @@ pub mod tests {
"retry_iteration=0, elapsed < 2^0 * RETRANSMIT_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
progress.get_retransmit_info_mut(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
@ -6013,7 +6030,7 @@ pub mod tests {
"retry_iteration=1, elapsed < 2^1 * RETRY_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
progress.get_retransmit_info_mut(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
@ -6026,7 +6043,7 @@ pub mod tests {
"retry_iteration=1, elapsed < 2^1 * RETRANSMIT_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
progress.get_retransmit_info_mut(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
@ -6046,11 +6063,11 @@ pub mod tests {
// increment to retry iteration 3
progress
.get_retransmit_info(0)
.get_retransmit_info_mut(0)
.unwrap()
.increment_retry_iteration();
progress.get_retransmit_info(0).unwrap().retry_time =
progress.get_retransmit_info_mut(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
@ -6063,7 +6080,7 @@ pub mod tests {
"retry_iteration=3, elapsed < 2^3 * RETRANSMIT_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
progress.get_retransmit_info_mut(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(8 * RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
@ -6082,6 +6099,91 @@ pub mod tests {
);
}
fn receive_slots(retransmit_slots_receiver: &RetransmitSlotsReceiver) -> Vec<Slot> {
let mut slots = Vec::default();
while let Ok(slot) = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)) {
slots.push(slot);
}
slots
}
#[test]
fn test_maybe_retransmit_unpropagated_slots() {
let ReplayBlockstoreComponents {
validator_node_to_vote_keys,
leader_schedule_cache,
vote_simulator,
..
} = replay_blockstore_components(None, 10, None::<GenerateVotes>);
let VoteSimulator {
mut progress,
ref bank_forks,
..
} = vote_simulator;
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let mut prev_index = 0;
for i in (1..10).chain(11..15) {
let bank = Bank::new_from_parent(
bank_forks.read().unwrap().get(prev_index).unwrap(),
&leader_schedule_cache.slot_leader_at(i, None).unwrap(),
i,
);
progress.insert(
i,
ForkProgress::new_from_bank(
&bank,
bank.collector_id(),
validator_node_to_vote_keys
.get(bank.collector_id())
.unwrap(),
Some(0),
0,
0,
),
);
assert!(progress.get_propagated_stats(i).unwrap().is_leader_slot);
bank.freeze();
bank_forks.write().unwrap().insert(bank);
prev_index = i;
}
// expect single slot when latest_leader_slot is the start of a consecutive range
let latest_leader_slot = 0;
ReplayStage::maybe_retransmit_unpropagated_slots(
"test",
&retransmit_slots_sender,
&mut progress,
latest_leader_slot,
);
let received_slots = receive_slots(&retransmit_slots_receiver);
assert_eq!(received_slots, vec![0]);
// expect range of slots from start of consecutive slots
let latest_leader_slot = 6;
ReplayStage::maybe_retransmit_unpropagated_slots(
"test",
&retransmit_slots_sender,
&mut progress,
latest_leader_slot,
);
let received_slots = receive_slots(&retransmit_slots_receiver);
assert_eq!(received_slots, vec![4, 5, 6]);
// expect range of slots skipping a discontinuity in the range
let latest_leader_slot = 11;
ReplayStage::maybe_retransmit_unpropagated_slots(
"test",
&retransmit_slots_sender,
&mut progress,
latest_leader_slot,
);
let received_slots = receive_slots(&retransmit_slots_receiver);
assert_eq!(received_slots, vec![8, 9, 11]);
}
fn run_compute_and_select_forks(
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,

View File

@ -61,6 +61,10 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 {
bank.ticks_per_slot() - tick_height % bank.ticks_per_slot()
}
pub fn first_of_consecutive_leader_slots(slot: Slot) -> Slot {
(slot / NUM_CONSECUTIVE_LEADER_SLOTS) * NUM_CONSECUTIVE_LEADER_SLOTS
}
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.