time based retransmit in replay_stage (#21498)

This commit is contained in:
Jeff Biseda 2021-12-17 05:44:40 -08:00 committed by GitHub
parent 66fa8f9667
commit 7ec39f5a1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 269 additions and 10 deletions

View File

@ -11,6 +11,7 @@ use {
std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{Arc, RwLock},
time::Instant,
},
};
@ -188,12 +189,39 @@ impl ValidatorStakeInfo {
}
}
pub const RETRANSMIT_BASE_DELAY_MS: u64 = 5_000;
pub const RETRANSMIT_BACKOFF_CAP: u32 = 6;
#[derive(Debug, Default)]
pub struct RetransmitInfo {
pub retry_time: Option<Instant>,
pub retry_iteration: u32,
}
impl RetransmitInfo {
pub fn reached_retransmit_threshold(&self) -> bool {
let backoff = std::cmp::min(self.retry_iteration, RETRANSMIT_BACKOFF_CAP);
let backoff_duration_ms = 2_u64.pow(backoff) * RETRANSMIT_BASE_DELAY_MS;
self.retry_time
.map(|time| time.elapsed().as_millis() > backoff_duration_ms.into())
.unwrap_or(true)
}
pub fn increment_retry_iteration(&mut self) {
if self.retry_time.is_some() {
self.retry_iteration += 1;
}
self.retry_time = Some(Instant::now());
}
}
pub struct ForkProgress {
pub is_dead: bool,
pub fork_stats: ForkStats,
pub propagated_stats: PropagatedStats,
pub replay_stats: ReplaySlotStats,
pub replay_progress: ConfirmationProgress,
pub retransmit_info: RetransmitInfo,
// Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only
// count new blocks replayed since last restart, which won't include
// blocks already existing in the ledger/before snapshot at start,
@ -251,6 +279,7 @@ impl ForkProgress {
total_epoch_stake,
..PropagatedStats::default()
},
retransmit_info: RetransmitInfo::default(),
}
}
@ -409,6 +438,12 @@ impl ProgressMap {
.map(|fork_progress| &mut fork_progress.fork_stats)
}
pub fn get_retransmit_info(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> {
self.progress_map
.get_mut(&slot)
.map(|fork_progress| &mut fork_progress.retransmit_info)
}
pub fn is_dead(&self, slot: Slot) -> Option<bool> {
self.progress_map
.get(&slot)

View File

@ -160,6 +160,7 @@ pub struct ReplayTiming {
process_duplicate_slots_elapsed: u64,
process_unfrozen_gossip_verified_vote_hashes_elapsed: u64,
repair_correct_slots_elapsed: u64,
retransmit_not_propagated_elapsed: u64,
}
impl ReplayTiming {
#[allow(clippy::too_many_arguments)]
@ -182,6 +183,7 @@ impl ReplayTiming {
process_unfrozen_gossip_verified_vote_hashes_elapsed: u64,
process_duplicate_slots_elapsed: u64,
repair_correct_slots_elapsed: u64,
retransmit_not_propagated_elapsed: u64,
) {
self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed;
self.compute_bank_stats_elapsed += compute_bank_stats_elapsed;
@ -202,6 +204,7 @@ impl ReplayTiming {
process_unfrozen_gossip_verified_vote_hashes_elapsed;
self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed;
self.repair_correct_slots_elapsed += repair_correct_slots_elapsed;
self.retransmit_not_propagated_elapsed += retransmit_not_propagated_elapsed;
let now = timestamp();
let elapsed_ms = now - self.last_print;
if elapsed_ms > 1000 {
@ -291,7 +294,12 @@ impl ReplayTiming {
"repair_correct_slots_elapsed",
self.repair_correct_slots_elapsed as i64,
i64
)
),
(
"retransmit_not_propagated_elapsed",
self.retransmit_not_propagated_elapsed as i64,
i64
),
);
*self = ReplayTiming::default();
@ -758,6 +766,14 @@ impl ReplayStage {
Self::dump_then_repair_correct_slots(&mut duplicate_slots_to_repair, &mut ancestors, &mut descendants, &mut progress, &bank_forks, &blockstore, poh_bank.map(|bank| bank.slot()));
dump_then_repair_correct_slots_time.stop();
let mut retransmit_not_propagated_time = Measure::start("retransmit_not_propagated_time");
Self::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
retransmit_not_propagated_time.stop();
// From this point on, its not safe to use ancestors/descendants since maybe_start_leader
// may add a bank that will not included in either of these maps.
drop(ancestors);
@ -769,7 +785,7 @@ impl ReplayStage {
&poh_recorder,
&leader_schedule_cache,
&rpc_subscriptions,
&progress,
&mut progress,
&retransmit_slots_sender,
&mut skipped_slots_info,
has_new_vote_been_rooted,
@ -819,6 +835,7 @@ impl ReplayStage {
process_unfrozen_gossip_verified_vote_hashes_time.as_us(),
process_duplicate_slots_time.as_us(),
dump_then_repair_correct_slots_time.as_us(),
retransmit_not_propagated_time.as_us(),
);
}
})
@ -830,6 +847,35 @@ impl ReplayStage {
}
}
fn retransmit_latest_unpropagated_leader_slot(
poh_recorder: &Arc<Mutex<PohRecorder>>,
retransmit_slots_sender: &RetransmitSlotsSender,
progress: &mut ProgressMap,
) {
let start_slot = poh_recorder.lock().unwrap().start_slot();
if let Some(latest_leader_slot) = progress.get_latest_leader_slot(start_slot) {
if !progress.is_propagated(latest_leader_slot) {
warn!("Slot not propagated: slot={}", latest_leader_slot);
let retransmit_info = progress.get_retransmit_info(latest_leader_slot).unwrap();
if retransmit_info.reached_retransmit_threshold() {
info!(
"Retrying retransmit: start_slot={} latest_leader_slot={} retransmit_info={:?}",
start_slot, latest_leader_slot, &retransmit_info,
);
datapoint_info!(
"replay_stage-retransmit-timing-based",
("slot", latest_leader_slot, i64),
("retry_iteration", retransmit_info.retry_iteration, i64),
);
let _ = retransmit_slots_sender.send(latest_leader_slot);
retransmit_info.increment_retry_iteration();
} else {
info!("Bypass retry: {:?}", &retransmit_info);
}
}
}
}
fn is_partition_detected(
ancestors: &HashMap<Slot, HashSet<Slot>>,
last_voted_slot: Slot,
@ -1352,7 +1398,7 @@ impl ReplayStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
progress_map: &ProgressMap,
progress_map: &mut ProgressMap,
retransmit_slots_sender: &RetransmitSlotsSender,
skipped_slots_info: &mut SkippedSlotsInfo,
has_new_vote_been_rooted: bool,
@ -1433,13 +1479,28 @@ impl ReplayStage {
skipped_slots_info.last_skipped_slot = poh_slot;
}
// Signal retransmit
if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) {
datapoint_info!(
"replay_stage-retransmit",
("slot", latest_unconfirmed_leader_slot, i64),
);
let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot);
let retransmit_info = progress_map
.get_retransmit_info(latest_unconfirmed_leader_slot)
.unwrap();
if retransmit_info.reached_retransmit_threshold() {
info!(
"Retrying retransmit: retransmit_info={:?}",
&retransmit_info
);
datapoint_info!(
"replay_stage-retransmit",
("slot", latest_unconfirmed_leader_slot, i64),
("retry_iteration", retransmit_info.retry_iteration, i64),
);
let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot);
retransmit_info.increment_retry_iteration();
} else {
info!(
"Bypassing retransmit of my leader slot retransmit_info={:?}",
&retransmit_info
);
}
}
return;
}
@ -2873,7 +2934,7 @@ pub mod tests {
super::*,
crate::{
consensus::Tower,
progress_map::ValidatorStakeInfo,
progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
replay_stage::ReplayStage,
tree_diff::TreeDiff,
vote_simulator::{self, VoteSimulator},
@ -5834,6 +5895,169 @@ pub mod tests {
assert_eq!(tower.last_voted_slot().unwrap(), 1);
}
#[test]
fn test_retransmit_latest_unpropagated_leader_slot() {
let ReplayBlockstoreComponents {
validator_node_to_vote_keys,
leader_schedule_cache,
poh_recorder,
vote_simulator,
..
} = replay_blockstore_components(None, 10, None::<GenerateVotes>);
let VoteSimulator {
mut progress,
ref bank_forks,
..
} = vote_simulator;
let poh_recorder = Arc::new(poh_recorder);
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let bank1 = Bank::new_from_parent(
bank_forks.read().unwrap().get(0).unwrap(),
&leader_schedule_cache.slot_leader_at(1, None).unwrap(),
1,
);
progress.insert(
1,
ForkProgress::new_from_bank(
&bank1,
bank1.collector_id(),
validator_node_to_vote_keys
.get(bank1.collector_id())
.unwrap(),
Some(0),
0,
0,
),
);
assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot);
bank1.freeze();
bank_forks.write().unwrap().insert(bank1);
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(res.is_ok(), "retry_iteration=0, retry_time=None");
assert_eq!(
progress.get_retransmit_info(0).unwrap().retry_iteration,
0,
"retransmit should not advance retry_iteration before time has been set"
);
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(
res.is_err(),
"retry_iteration=0, elapsed < 2^0 * RETRANSMIT_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(
res.is_ok(),
"retry_iteration=0, elapsed > RETRANSMIT_BASE_DELAY_MS"
);
assert_eq!(
progress.get_retransmit_info(0).unwrap().retry_iteration,
1,
"retransmit should advance retry_iteration"
);
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(
res.is_err(),
"retry_iteration=1, elapsed < 2^1 * RETRY_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(
res.is_err(),
"retry_iteration=1, elapsed < 2^1 * RETRANSMIT_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(
res.is_ok(),
"retry_iteration=1, elapsed > 2^1 * RETRANSMIT_BASE_DELAY_MS"
);
assert_eq!(
progress.get_retransmit_info(0).unwrap().retry_iteration,
2,
"retransmit should advance retry_iteration"
);
// increment to retry iteration 3
progress
.get_retransmit_info(0)
.unwrap()
.increment_retry_iteration();
progress.get_retransmit_info(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(2 * RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(
res.is_err(),
"retry_iteration=3, elapsed < 2^3 * RETRANSMIT_BASE_DELAY_MS"
);
progress.get_retransmit_info(0).unwrap().retry_time =
Some(Instant::now() - Duration::from_millis(8 * RETRANSMIT_BASE_DELAY_MS + 1));
ReplayStage::retransmit_latest_unpropagated_leader_slot(
&poh_recorder,
&retransmit_slots_sender,
&mut progress,
);
let res = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10));
assert!(
res.is_ok(),
"retry_iteration=3, elapsed > 2^3 * RETRANSMIT_BASE_DELAY"
);
assert_eq!(
progress.get_retransmit_info(0).unwrap().retry_iteration,
4,
"retransmit should advance retry_iteration"
);
}
fn run_compute_and_select_forks(
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,