enhance replay partition metrics (#31010)

* enhance replay partition metrics
This commit is contained in:
Brennan 2023-04-04 19:57:09 -07:00 committed by GitHub
parent 9fb22bc0be
commit 60c4a718a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 76 additions and 31 deletions

View File

@ -149,6 +149,69 @@ struct SkippedSlotsInfo {
last_skipped_slot: u64,
}
struct PartitionInfo {
partition_start_time: Option<Instant>,
}
impl PartitionInfo {
fn new() -> Self {
Self {
partition_start_time: None,
}
}
fn update(
&mut self,
partition_detected: bool,
heaviest_slot: Slot,
last_voted_slot: Slot,
reset_bank_slot: Slot,
heaviest_fork_failures: Vec<HeaviestForkFailures>,
) {
if self.partition_start_time.is_none() && partition_detected {
warn!("PARTITION DETECTED waiting to join heaviest fork: {} last vote: {:?}, reset slot: {}",
heaviest_slot,
last_voted_slot,
reset_bank_slot,
);
datapoint_info!(
"replay_stage-partition-start",
("heaviest_slot", heaviest_slot as i64, i64),
("last_vote_slot", last_voted_slot as i64, i64),
("reset_slot", reset_bank_slot as i64, i64),
(
"heaviest_fork_failure_first",
format!("{:?}", heaviest_fork_failures.first()),
String
),
(
"heaviest_fork_failure_second",
format!("{:?}", heaviest_fork_failures.get(1)),
String
),
);
self.partition_start_time = Some(Instant::now());
} else if self.partition_start_time.is_some() && !partition_detected {
warn!(
"PARTITION resolved heaviest fork: {} last vote: {:?}, reset slot: {}",
heaviest_slot, last_voted_slot, reset_bank_slot
);
datapoint_info!(
"replay_stage-partition-resolved",
("heaviest_slot", heaviest_slot as i64, i64),
("last_vote_slot", last_voted_slot as i64, i64),
("reset_slot", reset_bank_slot as i64, i64),
(
"partition_duration_ms",
self.partition_start_time.unwrap().elapsed().as_millis() as i64,
i64
),
);
self.partition_start_time = None;
}
}
}
pub struct ReplayStageConfig {
pub vote_account: Pubkey,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
@ -454,7 +517,7 @@ impl ReplayStage {
);
let mut current_leader = None;
let mut last_reset = Hash::default();
let mut partition_exists = false;
let mut partition_info = PartitionInfo::new();
let mut skipped_slots_info = SkippedSlotsInfo::default();
let mut replay_timing = ReplayTiming::default();
let mut duplicate_slots_tracker = DuplicateSlotsTracker::default();
@ -736,10 +799,10 @@ impl ReplayStage {
heaviest_fork_failures
);
for r in heaviest_fork_failures {
for r in &heaviest_fork_failures {
if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r {
if let Some(latest_leader_slot) =
progress.get_latest_leader_slot_must_exist(slot)
progress.get_latest_leader_slot_must_exist(*slot)
{
progress.log_propagated_stats(latest_leader_slot, &bank_forks);
}
@ -791,7 +854,7 @@ impl ReplayStage {
&drop_bank_sender,
wait_to_vote_slot,
);
};
}
voting_time.stop();
let mut reset_bank_time = Measure::start("reset_bank");
@ -865,35 +928,17 @@ impl ReplayStage {
if let Some(last_voted_slot) = tower.last_voted_slot() {
// If the current heaviest bank is not a descendant of the last voted slot,
// there must be a partition
let partition_detected = Self::is_partition_detected(
&ancestors,
last_voted_slot,
partition_info.update(
Self::is_partition_detected(
&ancestors,
last_voted_slot,
heaviest_bank.slot(),
),
heaviest_bank.slot(),
last_voted_slot,
reset_bank.slot(),
heaviest_fork_failures,
);
if !partition_exists && partition_detected {
warn!(
"PARTITION DETECTED waiting to join heaviest fork: {} last vote: {:?}, reset slot: {}",
heaviest_bank.slot(),
last_voted_slot,
reset_bank.slot(),
);
inc_new_counter_info!("replay_stage-partition_detected", 1);
datapoint_info!(
"replay_stage-partition",
("slot", reset_bank.slot() as i64, i64)
);
partition_exists = true;
} else if partition_exists && !partition_detected {
warn!(
"PARTITION resolved heaviest fork: {} last vote: {:?}, reset slot: {}",
heaviest_bank.slot(),
last_voted_slot,
reset_bank.slot()
);
partition_exists = false;
inc_new_counter_info!("replay_stage-partition_resolved", 1);
}
}
}
}