diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 541928211d..2b1f20a19c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -18,7 +18,7 @@ use solana_ledger::{ snapshot_package::SnapshotPackageSender, }; use solana_measure::measure::Measure; -use solana_metrics::{datapoint_warn, inc_new_counter_info}; +use solana_metrics::inc_new_counter_info; use solana_runtime::bank::Bank; use solana_sdk::{ clock::Slot, @@ -90,6 +90,7 @@ struct ForkStats { is_locked_out: bool, stake_lockouts: HashMap, computed: bool, + confirmation_reported: bool, } impl ReplaySlotStats { @@ -203,6 +204,10 @@ impl ReplayStage { .spawn(move || { let _exit = Finalizer::new(exit_.clone()); let mut progress = HashMap::new(); + // Initialize progress map with any root banks + for bank in bank_forks.read().unwrap().frozen_banks().values() { + progress.insert(bank.slot(), ForkProgress::new(bank.slot(), bank.last_blockhash())); + } let mut current_leader = None; let mut last_reset = Hash::default(); let mut partition = false; @@ -454,12 +459,9 @@ impl ReplayStage { fn replay_blocktree_into_bank( bank: &Arc, blocktree: &Blocktree, - progress: &mut HashMap, + bank_progress: &mut ForkProgress, ) -> (Result<()>, usize) { let mut tx_count = 0; - let bank_progress = &mut progress - .entry(bank.slot()) - .or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash())); let now = Instant::now(); let load_result = Self::load_blocktree_entries_with_shred_info(bank, blocktree, bank_progress); @@ -493,22 +495,14 @@ impl ReplayStage { ("error", format!("error: {:?}", replay_result), String), ("slot", bank.slot(), i64) ); - Self::mark_dead_slot(bank.slot(), blocktree, progress); + Self::mark_dead_slot(bank.slot(), blocktree, bank_progress); } (replay_result, tx_count) } - fn mark_dead_slot( - slot: Slot, - blocktree: &Blocktree, - progress: &mut HashMap, - ) { - // Remove from progress map so we no longer try to replay this bank - let mut progress_entry = progress - .get_mut(&slot) - .expect("Progress entry must exist after call to replay_entries_into_bank()"); - progress_entry.is_dead = true; + fn mark_dead_slot(slot: Slot, blocktree: &Blocktree, bank_progress: &mut ForkProgress) { + bank_progress.is_dead = true; blocktree .set_dead_slot(slot) .expect("Failed to mark slot as dead in blocktree"); @@ -650,9 +644,16 @@ impl ReplayStage { } let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); + + // Insert a progress entry even for slots this node is the leader for, so that + // 1) confirm_forks can report confirmation, 2) we can cache computations about + // this bank in `select_fork()` + let bank_progress = &mut progress + .entry(bank.slot()) + .or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash())); if bank.collector_id() != my_pubkey { let (replay_result, replay_tx_count) = - Self::replay_blocktree_into_bank(&bank, &blocktree, progress); + Self::replay_blocktree_into_bank(&bank, &blocktree, bank_progress); tx_count += replay_tx_count; if Self::is_replay_result_fatal(&replay_result) { trace!("replay_result_fatal slot {}", bank_slot); @@ -703,10 +704,15 @@ impl ReplayStage { let stats: Vec = frozen_banks .iter() .map(|bank| { + // Only time progress map should be missing a bank slot + // is if this node was the leader for this slot as those banks + // are not replayed in replay_active_banks() let mut stats = progress .get(&bank.slot()) - .map(|s| s.fork_stats.clone()) - .unwrap_or_default(); + .expect("All frozen banks must exist in the Progress map") + .fork_stats + .clone(); + if !stats.computed { stats.slot = bank.slot(); let (stake_lockouts, total_staked) = tower.collect_vote_lockouts( @@ -734,9 +740,10 @@ impl ReplayStage { stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors); stats.has_voted = tower.has_voted(bank.slot()); stats.is_recent = tower.is_recent(bank.slot()); - if let Some(fp) = progress.get_mut(&bank.slot()) { - fp.fork_stats = stats.clone(); - } + progress + .get_mut(&bank.slot()) + .expect("All frozen banks must exist in the Progress map") + .fork_stats = stats.clone(); stats }) .collect(); @@ -836,29 +843,30 @@ impl ReplayStage { progress: &mut HashMap, bank_forks: &Arc>, ) { - progress.retain(|slot, prog| { - let duration = timing::timestamp() - prog.started_ms; - if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked) - && bank_forks - .read() - .unwrap() - .get(*slot) - .map(|s| s.is_frozen()) - .unwrap_or(true) - { - info!("validator fork confirmed {} {}ms", *slot, duration); - datapoint_warn!("validator-confirmation", ("duration_ms", duration, i64)); - false - } else { - debug!( - "validator fork not confirmed {} {}ms {:?}", - *slot, - duration, - stake_lockouts.get(slot) - ); - true + for (slot, prog) in progress.iter_mut() { + if !prog.fork_stats.confirmation_reported { + let duration = timing::timestamp() - prog.started_ms; + if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked) + && bank_forks + .read() + .unwrap() + .get(*slot) + .map(|s| s.is_frozen()) + .unwrap_or(true) + { + info!("validator fork confirmed {} {}ms", *slot, duration); + datapoint_warn!("validatorconfirmation", ("duration_ms", duration, i64)); + prog.fork_stats.confirmation_reported = true; + } else { + debug!( + "validator fork not confirmed {} {}ms {:?}", + *slot, + duration, + stake_lockouts.get(slot) + ); + } } - }); + } } fn load_blocktree_entries_with_shred_info( @@ -1350,11 +1358,13 @@ mod test { let bank0 = Arc::new(Bank::new(&genesis_config)); let mut progress = HashMap::new(); let last_blockhash = bank0.last_blockhash(); - progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash)); + let mut bank0_progress = progress + .entry(bank0.slot()) + .or_insert_with(|| ForkProgress::new(0, last_blockhash)); let shreds = shred_to_insert(&mint_keypair, bank0.clone()); blocktree.insert_shreds(shreds, None, false).unwrap(); let (res, _tx_count) = - ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); + ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut bank0_progress); // Check that the erroring bank was marked as dead in the progress map assert!(progress