Add Metrics/Dashboards tracking block production (#9342)

* Add metric tracking blocks/dropped blocks

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-04-08 14:35:24 -07:00 committed by GitHub
parent 36e73cada4
commit 4522e85ac4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 903 additions and 667 deletions

View File

@ -566,9 +566,9 @@ pub mod test {
.expect("parent bank must exist")
.clone();
info!("parent of {} is {}", missing_slot, parent_bank.slot(),);
progress
.entry(missing_slot)
.or_insert_with(|| ForkProgress::new(parent_bank.last_blockhash(), None, None));
progress.entry(missing_slot).or_insert_with(|| {
ForkProgress::new(parent_bank.last_blockhash(), None, None, 0, 0)
});
// Create the missing bank
let new_bank =
@ -719,7 +719,10 @@ pub mod test {
bank0.freeze();
let mut progress = ProgressMap::default();
progress.insert(0, ForkProgress::new(bank0.last_blockhash(), None, None));
progress.insert(
0,
ForkProgress::new(bank0.last_blockhash(), None, None, 0, 0),
);
(BankForks::new(0, bank0), progress)
}

View File

@ -85,6 +85,12 @@ pub(crate) struct ForkProgress {
pub(crate) propagated_stats: PropagatedStats,
pub(crate) replay_stats: ReplaySlotStats,
pub(crate) replay_progress: ConfirmationProgress,
// Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only
// count new blocks replayed since last restart, which won't include
// blocks already existing in the ledger/before snapshot at start,
// so these stats do not span all of time
pub(crate) num_blocks_on_fork: u64,
pub(crate) num_dropped_blocks_on_fork: u64,
}
impl ForkProgress {
@ -92,6 +98,8 @@ impl ForkProgress {
last_entry: Hash,
prev_leader_slot: Option<Slot>,
validator_stake_info: Option<ValidatorStakeInfo>,
num_blocks_on_fork: u64,
num_dropped_blocks_on_fork: u64,
) -> Self {
let (
is_leader_slot,
@ -124,6 +132,8 @@ impl ForkProgress {
fork_stats: ForkStats::default(),
replay_stats: ReplaySlotStats::default(),
replay_progress: ConfirmationProgress::new(last_entry),
num_blocks_on_fork,
num_dropped_blocks_on_fork,
propagated_stats: PropagatedStats {
prev_leader_slot,
is_leader_slot,
@ -141,6 +151,8 @@ impl ForkProgress {
my_pubkey: &Pubkey,
voting_pubkey: &Pubkey,
prev_leader_slot: Option<Slot>,
num_blocks_on_fork: u64,
num_dropped_blocks_on_fork: u64,
) -> Self {
let validator_fork_info = {
if bank.collector_id() == my_pubkey {
@ -155,7 +167,13 @@ impl ForkProgress {
}
};
Self::new(bank.last_blockhash(), prev_leader_slot, validator_fork_info)
Self::new(
bank.last_blockhash(),
prev_leader_slot,
validator_fork_info,
num_blocks_on_fork,
num_dropped_blocks_on_fork,
)
}
}
@ -352,6 +370,26 @@ impl ProgressMap {
self.progress_map
.retain(|k, _| bank_forks.get(*k).is_some());
}
pub fn log_propagated_stats(&self, slot: Slot, bank_forks: &RwLock<BankForks>) {
if let Some(stats) = self.get_propagated_stats(slot) {
info!(
"Propagated stats:
total staked: {},
observed staked: {},
vote pubkeys: {:?},
node_pubkeys: {:?},
slot: {},
epoch: {:?}",
stats.total_epoch_stake,
stats.propagated_validators_stake,
stats.propagated_validators,
stats.propagated_node_ids,
slot,
bank_forks.read().unwrap().get(slot).map(|x| x.epoch()),
);
}
}
}
#[cfg(test)]
@ -476,7 +514,7 @@ mod test {
fn test_is_propagated_status_on_construction() {
// If the given ValidatorStakeInfo == None, then this is not
// a leader slot and is_propagated == false
let progress = ForkProgress::new(Hash::default(), Some(9), None);
let progress = ForkProgress::new(Hash::default(), Some(9), None, 0, 0);
assert!(!progress.propagated_stats.is_propagated);
// If the stake is zero, then threshold is always achieved
@ -487,6 +525,8 @@ mod test {
total_epoch_stake: 0,
..ValidatorStakeInfo::default()
}),
0,
0,
);
assert!(progress.propagated_stats.is_propagated);
@ -499,6 +539,8 @@ mod test {
total_epoch_stake: 2,
..ValidatorStakeInfo::default()
}),
0,
0,
);
assert!(!progress.propagated_stats.is_propagated);
@ -511,6 +553,8 @@ mod test {
total_epoch_stake: 2,
..ValidatorStakeInfo::default()
}),
0,
0,
);
assert!(progress.propagated_stats.is_propagated);
@ -521,6 +565,8 @@ mod test {
Hash::default(),
Some(9),
Some(ValidatorStakeInfo::default()),
0,
0,
);
assert!(!progress.propagated_stats.is_propagated);
}
@ -531,10 +577,16 @@ mod test {
// Insert new ForkProgress for slot 10 (not a leader slot) and its
// previous leader slot 9 (leader slot)
progress_map.insert(10, ForkProgress::new(Hash::default(), Some(9), None));
progress_map.insert(10, ForkProgress::new(Hash::default(), Some(9), None, 0, 0));
progress_map.insert(
9,
ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())),
ForkProgress::new(
Hash::default(),
None,
Some(ValidatorStakeInfo::default()),
0,
0,
),
);
// None of these slot have parents which are confirmed
@ -545,7 +597,7 @@ mod test {
// The previous leader before 8, slot 7, does not exist in
// progress map, so is_propagated(8) should return true as
// this implies the parent is rooted
progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None));
progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None, 0, 0));
assert!(progress_map.is_propagated(8));
// If we set the is_propagated = true, is_propagated should return true

View File

@ -171,6 +171,8 @@ impl ReplayStage {
&my_pubkey,
&vote_account,
prev_leader_slot,
0,
0,
),
);
}
@ -281,22 +283,7 @@ impl ReplayStage {
for r in failure_reasons {
if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r {
if let Some(latest_leader_slot) = progress.get_latest_leader_slot(slot)
{
if let Some(stats) =
progress.get_propagated_stats(latest_leader_slot)
{
info!(
"total staked: {}, observed staked: {}, vote pubkeys: {:?}, node_pubkeys: {:?}, latest_leader_slot: {}, epoch: {:?}",
stats.total_epoch_stake,
stats.propagated_validators_stake,
stats.propagated_validators,
stats.propagated_node_ids,
latest_leader_slot,
bank_forks.read().unwrap().get(latest_leader_slot).map(|x| x.epoch()),
);
}
}
progress.log_propagated_stats(slot, &bank_forks);
}
}
}
@ -305,9 +292,12 @@ impl ReplayStage {
// Vote on a fork
if let Some(ref vote_bank) = vote_bank {
subscriptions.notify_subscribers(block_commitment_cache.read().unwrap().slot(), &bank_forks);
if let Some(votable_leader) = leader_schedule_cache
.slot_leader_at(vote_bank.slot(), Some(vote_bank))
subscriptions.notify_subscribers(
block_commitment_cache.read().unwrap().slot(),
&bank_forks,
);
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank))
{
Self::log_leader_change(
&my_pubkey,
@ -341,13 +331,24 @@ impl ReplayStage {
// Reset onto a fork
if let Some(reset_bank) = reset_bank {
if last_reset != reset_bank.last_blockhash()
{
if last_reset != reset_bank.last_blockhash() {
info!(
"vote bank: {:?} reset bank: {:?}",
vote_bank.as_ref().map(|b| b.slot()),
reset_bank.slot(),
);
let fork_progress = progress
.get(&reset_bank.slot())
.expect("bank to reset to must exist in progress map");
datapoint_info!(
"blocks_produced",
("num_blocks_on_fork", fork_progress.num_blocks_on_fork, i64),
(
"num_dropped_blocks_on_fork",
fork_progress.num_dropped_blocks_on_fork,
i64
),
);
Self::reset_poh_recorder(
&my_pubkey,
&blockstore,
@ -575,17 +576,22 @@ impl ReplayStage {
);
if !Self::check_propagation_for_start_leader(poh_slot, parent_slot, progress_map) {
let latest_leader_slot = progress_map.get_latest_leader_slot(parent_slot).expect("In order for propagated check to fail, latest leader must exist in progress map");
let latest_unconfirmed_leader_slot = progress_map.get_latest_leader_slot(parent_slot).expect("In order for propagated check to fail, latest leader must exist in progress map");
if poh_slot != skipped_slots_info.last_skipped_slot {
datapoint_info!(
"replay_stage-skip_leader_slot",
("slot", poh_slot, i64),
("parent_slot", parent_slot, i64),
("latest_unconfirmed_leader", latest_leader_slot, i64)
(
"latest_unconfirmed_leader_slot",
latest_unconfirmed_leader_slot,
i64
)
);
progress_map.log_propagated_stats(latest_unconfirmed_leader_slot, bank_forks);
skipped_slots_info.last_skipped_slot = poh_slot;
}
let bank = bank_forks.read().unwrap().get(latest_leader_slot)
let bank = bank_forks.read().unwrap().get(latest_unconfirmed_leader_slot)
.expect("In order for propagated check to fail, latest leader must exist in progress map, and thus also in BankForks").clone();
// Signal retransmit
@ -599,6 +605,7 @@ impl ReplayStage {
}
let root_slot = bank_forks.read().unwrap().root();
datapoint_info!("replay_stage-my_leader_slot", ("slot", poh_slot, i64),);
info!(
"new fork:{} parent:{} (leader) root:{}",
poh_slot, parent_slot, root_slot
@ -893,13 +900,30 @@ impl ReplayStage {
}
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
let parent_slot = bank.parent_slot();
let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank);
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
let stats = progress
.get(&parent_slot)
.expect("parent of active bank must exist in progress map");
let num_blocks_on_fork = stats.num_blocks_on_fork + 1;
let new_dropped_blocks = bank.slot() - parent_slot - 1;
let num_dropped_blocks_on_fork =
stats.num_dropped_blocks_on_fork + new_dropped_blocks;
(num_blocks_on_fork, num_dropped_blocks_on_fork)
};
// Insert a progress entry even for slots this node is the leader for, so that
// 1) confirm_forks can report confirmation, 2) we can cache computations about
// this bank in `select_forks()`
let bank_progress = &mut progress.entry(bank.slot()).or_insert_with(|| {
ForkProgress::new_from_bank(&bank, &my_pubkey, vote_account, prev_leader_slot)
ForkProgress::new_from_bank(
&bank,
&my_pubkey,
vote_account,
prev_leader_slot,
num_blocks_on_fork,
num_dropped_blocks_on_fork,
)
});
if bank.collector_id() != my_pubkey {
let replay_result = Self::replay_blockstore_into_bank(
@ -1740,7 +1764,7 @@ pub(crate) mod tests {
let bank = &bank_forks.banks[&0];
fork_progress
.entry(neutral_fork.fork[0])
.or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None));
.or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None, 0, 0));
}
for index in 1..neutral_fork.fork.len() {
@ -1766,7 +1790,7 @@ pub(crate) mod tests {
let bank = &bank_forks.banks[&neutral_fork.fork[index]];
fork_progress
.entry(bank_forks.banks[&neutral_fork.fork[index]].slot())
.or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None));
.or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None, 0, 0));
}
}
@ -1806,7 +1830,9 @@ pub(crate) mod tests {
let bank = &bank_forks.banks[&fork_info.fork[index]];
fork_progress
.entry(bank_forks.banks[&fork_info.fork[index]].slot())
.or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None));
.or_insert_with(|| {
ForkProgress::new(bank.last_blockhash(), None, None, 0, 0)
});
}
}
}
@ -1942,7 +1968,14 @@ pub(crate) mod tests {
let mut progress = ProgressMap::default();
progress.insert(
0,
ForkProgress::new_from_bank(&bank0, bank0.collector_id(), &Pubkey::default(), None),
ForkProgress::new_from_bank(
&bank0,
bank0.collector_id(),
&Pubkey::default(),
None,
0,
0,
),
);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let exit = Arc::new(AtomicBool::new(false));
@ -1966,6 +1999,8 @@ pub(crate) mod tests {
bank1.collector_id(),
&validator_voting_keys.get(&bank1.collector_id()).unwrap(),
Some(0),
0,
0,
),
);
assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot);
@ -2060,7 +2095,7 @@ pub(crate) mod tests {
bank_forks.write().unwrap().insert(root_bank);
let mut progress = ProgressMap::default();
for i in 0..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None));
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
}
ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None, &mut HashSet::new());
assert_eq!(bank_forks.read().unwrap().root(), root);
@ -2298,7 +2333,7 @@ pub(crate) mod tests {
let last_blockhash = bank0.last_blockhash();
let mut bank0_progress = progress
.entry(bank0.slot())
.or_insert_with(|| ForkProgress::new(last_blockhash, None, None));
.or_insert_with(|| ForkProgress::new(last_blockhash, None, None, 0, 0));
let shreds = shred_to_insert(&mint_keypair, bank0.clone());
blockstore.insert_shreds(shreds, None, false).unwrap();
let res = ReplayStage::replay_blockstore_into_bank(
@ -2626,7 +2661,10 @@ pub(crate) mod tests {
// Insert the bank that contains a vote for slot 0, which confirms slot 0
bank_forks.write().unwrap().insert(bank1);
progress.insert(1, ForkProgress::new(bank0.last_blockhash(), None, None));
progress.insert(
1,
ForkProgress::new(bank0.last_blockhash(), None, None, 0, 0),
);
let ancestors = bank_forks.read().unwrap().ancestors();
let mut frozen_banks: Vec<_> = bank_forks
.read()
@ -2999,6 +3037,8 @@ pub(crate) mod tests {
total_epoch_stake,
..ValidatorStakeInfo::default()
}),
0,
0,
),
);
progress_map.insert(
@ -3010,6 +3050,8 @@ pub(crate) mod tests {
total_epoch_stake,
..ValidatorStakeInfo::default()
}),
0,
0,
),
);
@ -3085,16 +3127,22 @@ pub(crate) mod tests {
bank_forks.insert(Bank::new_from_parent(&parent_bank, &Pubkey::default(), i));
progress_map.insert(
i,
ForkProgress::new(Hash::default(), Some(prev_leader_slot), {
if i % 2 == 0 {
Some(ValidatorStakeInfo {
total_epoch_stake,
..ValidatorStakeInfo::default()
})
} else {
None
}
}),
ForkProgress::new(
Hash::default(),
Some(prev_leader_slot),
{
if i % 2 == 0 {
Some(ValidatorStakeInfo {
total_epoch_stake,
..ValidatorStakeInfo::default()
})
} else {
None
}
},
0,
0,
),
);
}
@ -3167,6 +3215,8 @@ pub(crate) mod tests {
total_epoch_stake,
..ValidatorStakeInfo::default()
}),
0,
0,
);
let end_range = {
@ -3222,7 +3272,7 @@ pub(crate) mod tests {
// If there is no previous leader slot (previous leader slot is None),
// should succeed
progress_map.insert(3, ForkProgress::new(Hash::default(), None, None));
progress_map.insert(3, ForkProgress::new(Hash::default(), None, None, 0, 0));
assert!(ReplayStage::check_propagation_for_start_leader(
poh_slot,
parent_slot,
@ -3232,7 +3282,13 @@ pub(crate) mod tests {
// If the parent was itself the leader, then requires propagation confirmation
progress_map.insert(
3,
ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())),
ForkProgress::new(
Hash::default(),
None,
Some(ValidatorStakeInfo::default()),
0,
0,
),
);
assert!(!ReplayStage::check_propagation_for_start_leader(
poh_slot,
@ -3252,10 +3308,16 @@ pub(crate) mod tests {
// Now, set up the progress map to show that the previous leader slot of 5 is
// 2 (even though the parent is 3), so 2 needs to see propagation confirmation
// before we can start a leader for block 5
progress_map.insert(3, ForkProgress::new(Hash::default(), Some(2), None));
progress_map.insert(3, ForkProgress::new(Hash::default(), Some(2), None, 0, 0));
progress_map.insert(
2,
ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())),
ForkProgress::new(
Hash::default(),
None,
Some(ValidatorStakeInfo::default()),
0,
0,
),
);
// Last leader slot has not seen propagation threshold, so should fail
@ -3308,11 +3370,23 @@ pub(crate) mod tests {
// which means 3 and 4 are consecutiive leader slots
progress_map.insert(
3,
ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())),
ForkProgress::new(
Hash::default(),
None,
Some(ValidatorStakeInfo::default()),
0,
0,
),
);
progress_map.insert(
2,
ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())),
ForkProgress::new(
Hash::default(),
None,
Some(ValidatorStakeInfo::default()),
0,
0,
),
);
// If the last leader slot has not seen propagation threshold, but

View File

@ -180,12 +180,30 @@ impl BankForks {
root: Slot,
snapshot_package_sender: &Option<SnapshotPackageSender>,
) {
let old_epoch = self.root_bank().epoch();
self.root = root;
let set_root_start = Instant::now();
let root_bank = self
.banks
.get(&root)
.expect("root bank didn't exist in bank_forks");
let new_epoch = root_bank.epoch();
if old_epoch != new_epoch {
info!(
"Root entering
epoch: {},
next_epoch_start_slot: {},
epoch_stakes: {:#?}",
new_epoch,
root_bank
.epoch_schedule()
.get_first_slot_in_epoch(new_epoch + 1),
root_bank
.epoch_stakes(new_epoch)
.unwrap()
.node_id_to_vote_accounts()
);
}
let root_tx_count = root_bank
.parents()
.last()

View File

@ -7,7 +7,7 @@ use std::{collections::HashMap, sync::Arc};
pub type NodeIdToVoteAccounts = HashMap<Pubkey, NodeVoteAccounts>;
pub type EpochAuthorizedVoters = HashMap<Pubkey, Pubkey>;
#[derive(Clone, Serialize, Deserialize, Default, PartialEq)]
#[derive(Clone, Serialize, Debug, Deserialize, Default, PartialEq)]
pub struct NodeVoteAccounts {
pub vote_accounts: Vec<Pubkey>,
pub total_stake: u64,