diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 41df06e518..b68722f4dc 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -466,7 +466,7 @@ impl Tower { } #[cfg(test)] -mod test { +pub mod test { use super::*; use crate::replay_stage::{ForkProgress, ReplayStage}; use solana_ledger::bank_forks::BankForks; @@ -489,13 +489,13 @@ mod test { use std::{thread::sleep, time::Duration}; use trees::{tr, Node, Tree}; - struct ValidatorKeypairs { + pub(crate) struct ValidatorKeypairs { node_keypair: Keypair, vote_keypair: Keypair, } impl ValidatorKeypairs { - fn new(node_keypair: Keypair, vote_keypair: Keypair) -> Self { + pub(crate) fn new(node_keypair: Keypair, vote_keypair: Keypair) -> Self { Self { node_keypair, vote_keypair, @@ -503,19 +503,19 @@ mod test { } } - struct VoteSimulator<'a> { + pub(crate) struct VoteSimulator<'a> { searchable_nodes: HashMap>, } impl<'a> VoteSimulator<'a> { - pub fn new(forks: &'a Tree) -> Self { + pub(crate) fn new(forks: &'a Tree) -> Self { let mut searchable_nodes = HashMap::new(); let root = forks.root(); searchable_nodes.insert(root.data, root); Self { searchable_nodes } } - pub fn simulate_vote( + pub(crate) fn simulate_vote( &mut self, vote_slot: Slot, bank_forks: &RwLock, @@ -592,7 +592,21 @@ mod test { let my_pubkey = my_keypairs.node_keypair.pubkey(); let my_vote_pubkey = my_keypairs.vote_keypair.pubkey(); let ancestors = bank_forks.read().unwrap().ancestors(); - ReplayStage::select_fork(&my_pubkey, &ancestors, &bank_forks, tower, progress); + let mut frozen_banks: Vec<_> = bank_forks + .read() + .unwrap() + .frozen_banks() + .values() + .cloned() + .collect(); + ReplayStage::compute_bank_stats( + &my_pubkey, + &ancestors, + &mut frozen_banks, + tower, + progress, + ); + ReplayStage::select_fork(&frozen_banks, tower, progress); let bank = bank_forks .read() @@ -666,21 +680,17 @@ mod test { } #[derive(PartialEq, Debug)] - enum VoteResult { + pub(crate) enum VoteResult { LockedOut(u64), FailedThreshold(u64), FailedAllChecks(u64), Ok, } - // Setup BankForks with banks including all the votes per validator as - // specified in the input `validator_votes` - fn initialize_state( - validator_votes: &HashMap>, + // Setup BankForks with bank 0 and all the validator accounts + pub(crate) fn initialize_state( validator_keypairs: &HashMap, ) -> (BankForks, HashMap) { - assert!(validator_votes.len() < 1_000_000); - let GenesisConfigInfo { mut genesis_config, mint_keypair, @@ -790,7 +800,7 @@ mod test { ); // Initialize BankForks - let (bank_forks, mut progress) = initialize_state(&HashMap::new(), &keypairs); + let (bank_forks, mut progress) = initialize_state(&keypairs); let bank_forks = RwLock::new(bank_forks); // Create the tree of banks @@ -870,8 +880,7 @@ mod test { votes.extend((45..=50).into_iter()); let mut cluster_votes: HashMap> = HashMap::new(); - cluster_votes.insert(node_pubkey, votes.clone()); - let (bank_forks, mut progress) = initialize_state(&cluster_votes, &keypairs); + let (bank_forks, mut progress) = initialize_state(&keypairs); let bank_forks = RwLock::new(bank_forks); // Simulate the votes. Should fail on trying to come back to the main fork diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index db718ae904..5b022dfa68 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -45,8 +45,6 @@ use std::{ pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; -type VoteAndPoHBank = Option<(Arc, ForkStats)>; - // Implement a destructor for the ReplayStage thread to signal it exited // even on panics struct Finalizer { @@ -251,13 +249,40 @@ impl ReplayStage { let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); loop { let start = allocated.get(); - let vote_bank = Self::select_fork( + let mut frozen_banks: Vec<_> = bank_forks + .read() + .unwrap() + .frozen_banks() + .values() + .cloned() + .collect(); + let newly_computed_slot_stats = Self::compute_bank_stats( &my_pubkey, &ancestors, - &bank_forks, + &mut frozen_banks, &tower, &mut progress, ); + for slot in newly_computed_slot_stats { + let fork_stats = &progress.get(&slot).unwrap().fork_stats; + let confirmed_forks = Self::confirm_forks( + &tower, + &fork_stats.stake_lockouts, + fork_stats.total_staked, + &progress, + &bank_forks, + ); + + for slot in confirmed_forks { + progress + .get_mut(&slot) + .unwrap() + .fork_stats + .confirmation_reported = true; + } + } + + let vote_bank = Self::select_fork(&frozen_banks, &tower, &mut progress); datapoint_debug!( "replay_stage-memory", ("select_fork", (allocated.get() - start) as i64, i64), @@ -265,12 +290,21 @@ impl ReplayStage { if vote_bank.is_none() { break; } - let (bank, stats) = vote_bank.unwrap(); + let bank = vote_bank.unwrap(); + let (is_locked_out, vote_threshold, fork_weight, total_staked) = { + let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats; + ( + fork_stats.is_locked_out, + fork_stats.vote_threshold, + fork_stats.weight, + fork_stats.total_staked, + ) + }; let mut done = false; let mut vote_bank_slot = None; let start = allocated.get(); - if !stats.is_locked_out && stats.vote_threshold { - info!("voting: {} {}", bank.slot(), stats.fork_weight); + if !is_locked_out && vote_threshold { + info!("voting: {} {}", bank.slot(), fork_weight); subscriptions.notify_subscribers(bank.slot(), &bank_forks); if let Some(votable_leader) = leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank)) @@ -294,7 +328,7 @@ impl ReplayStage { &blockstore, &leader_schedule_cache, &root_bank_sender, - stats.total_staked, + total_staked, &lockouts_sender, &snapshot_package_sender, &latest_root_senders, @@ -738,23 +772,77 @@ impl ReplayStage { did_complete_bank } - pub(crate) fn select_fork( + pub(crate) fn compute_bank_stats( my_pubkey: &Pubkey, ancestors: &HashMap>, - bank_forks: &RwLock, + frozen_banks: &mut Vec>, tower: &Tower, progress: &mut HashMap, - ) -> VoteAndPoHBank { - let tower_start = Instant::now(); - - let mut frozen_banks: Vec<_> = bank_forks - .read() - .unwrap() - .frozen_banks() - .values() - .cloned() - .collect(); + ) -> Vec { frozen_banks.sort_by_key(|bank| bank.slot()); + let mut new_stats = vec![]; + for bank in frozen_banks { + // Only time progress map should be missing a bank slot + // is if this node was the leader for this slot as those banks + // are not replayed in replay_active_banks() + let parent_weight = bank + .parent() + .and_then(|b| progress.get(&b.slot())) + .map(|x| x.fork_stats.fork_weight) + .unwrap_or(0); + let stats = &mut progress + .get_mut(&bank.slot()) + .expect("All frozen banks must exist in the Progress map") + .fork_stats; + + if !stats.computed { + stats.slot = bank.slot(); + let (stake_lockouts, total_staked, bank_weight) = tower.collect_vote_lockouts( + bank.slot(), + bank.vote_accounts().into_iter(), + &ancestors, + ); + stats.total_staked = total_staked; + stats.weight = bank_weight; + stats.fork_weight = stats.weight + parent_weight; + + datapoint_warn!( + "bank_weight", + ("slot", bank.slot(), i64), + // u128 too large for influx, convert to hex + ("weight", format!("{:X}", stats.weight), String), + ); + warn!( + "{} slot_weight: {} {} {} {}", + my_pubkey, + stats.slot, + stats.weight, + stats.fork_weight, + bank.parent().map(|b| b.slot()).unwrap_or(0) + ); + stats.stake_lockouts = stake_lockouts; + stats.block_height = bank.block_height(); + stats.computed = true; + } + stats.vote_threshold = tower.check_vote_stake_threshold( + bank.slot(), + &stats.stake_lockouts, + stats.total_staked, + ); + stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors); + stats.has_voted = tower.has_voted(bank.slot()); + stats.is_recent = tower.is_recent(bank.slot()); + new_stats.push(stats.slot); + } + new_stats + } + + pub(crate) fn select_fork( + frozen_banks: &[Arc], + tower: &Tower, + progress: &mut HashMap, + ) -> Option> { + let tower_start = Instant::now(); let num_frozen_banks = frozen_banks.len(); trace!("frozen_banks {}", frozen_banks.len()); @@ -763,72 +851,16 @@ impl ReplayStage { .filter(|b| b.slot() < tower.root().unwrap_or(0)) .count(); - let stats: Vec = frozen_banks + let stats: Vec<&ForkStats> = frozen_banks .iter() .map(|bank| { // Only time progress map should be missing a bank slot // is if this node was the leader for this slot as those banks // are not replayed in replay_active_banks() - let mut stats = progress + &progress .get(&bank.slot()) .expect("All frozen banks must exist in the Progress map") .fork_stats - .clone(); - - if !stats.computed { - stats.slot = bank.slot(); - let (stake_lockouts, total_staked, bank_weight) = tower.collect_vote_lockouts( - bank.slot(), - bank.vote_accounts().into_iter(), - &ancestors, - ); - Self::confirm_forks( - tower, - &stake_lockouts, - total_staked, - progress, - &bank_forks, - ); - stats.total_staked = total_staked; - stats.weight = bank_weight; - stats.fork_weight = stats.weight - + bank - .parent() - .and_then(|b| progress.get(&b.slot())) - .map(|x| x.fork_stats.fork_weight) - .unwrap_or(0); - - datapoint_warn!( - "bank_weight", - ("slot", bank.slot(), i64), - // u128 too large for influx, convert to hex - ("weight", format!("{:X}", stats.weight), String), - ); - warn!( - "{} slot_weight: {} {} {} {}", - my_pubkey, - stats.slot, - stats.weight, - stats.fork_weight, - bank.parent().map(|b| b.slot()).unwrap_or(0) - ); - stats.stake_lockouts = stake_lockouts; - stats.block_height = bank.block_height(); - stats.computed = true; - } - stats.vote_threshold = tower.check_vote_stake_threshold( - bank.slot(), - &stats.stake_lockouts, - stats.total_staked, - ); - stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors); - stats.has_voted = tower.has_voted(bank.slot()); - stats.is_recent = tower.is_recent(bank.slot()); - progress - .get_mut(&bank.slot()) - .expect("All frozen banks must exist in the Progress map") - .fork_stats = stats.clone(); - stats }) .collect(); let num_not_recent = stats.iter().filter(|s| !s.is_recent).count(); @@ -874,30 +906,31 @@ impl ReplayStage { ), ("tower_duration", ms as i64, i64), ); - rv.cloned().map(|x| (x.0.clone(), x.1.clone())) + rv.map(|x| x.0.clone()) } fn confirm_forks( tower: &Tower, stake_lockouts: &HashMap, total_staked: u64, - progress: &mut HashMap, + progress: &HashMap, bank_forks: &RwLock, - ) { - for (slot, prog) in progress.iter_mut() { + ) -> Vec { + let mut confirmed_forks = vec![]; + for (slot, prog) in progress.iter() { if !prog.fork_stats.confirmation_reported { + let bank = bank_forks + .read() + .unwrap() + .get(*slot) + .expect("bank in progress must exist in BankForks") + .clone(); let duration = prog.replay_stats.started.elapsed().as_millis(); - if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked) - && bank_forks - .read() - .unwrap() - .get(*slot) - .map(|s| s.is_frozen()) - .unwrap_or(true) + if bank.is_frozen() && tower.is_slot_confirmed(*slot, stake_lockouts, total_staked) { info!("validator fork confirmed {} {}ms", *slot, duration); datapoint_warn!("validator-confirmation", ("duration_ms", duration, i64)); - prog.fork_stats.confirmation_reported = true; + confirmed_forks.push(*slot); } else { debug!( "validator fork not confirmed {} {}ms {:?}", @@ -908,6 +941,7 @@ impl ReplayStage { } } } + confirmed_forks } pub(crate) fn handle_new_root( @@ -1002,6 +1036,7 @@ pub(crate) mod tests { use super::*; use crate::{ commitment::BlockCommitment, + consensus::test::{initialize_state, ValidatorKeypairs, VoteResult, VoteSimulator}, consensus::Tower, genesis_utils::{create_genesis_config, create_genesis_config_with_leader}, replay_stage::ReplayStage, @@ -1039,6 +1074,7 @@ pub(crate) mod tests { iter, sync::{Arc, RwLock}, }; + use trees::tr; struct ForkInfo { leader: usize, @@ -1228,19 +1264,28 @@ pub(crate) mod tests { (0..validators.len()) .map(|i| { - let response = ReplayStage::select_fork( + let mut frozen_banks: Vec<_> = wrapped_bank_fork + .read() + .unwrap() + .frozen_banks() + .values() + .cloned() + .collect(); + ReplayStage::compute_bank_stats( &validators[i].keypair.pubkey(), &bank_fork_ancestors, - &wrapped_bank_fork, + &mut frozen_banks, &towers[i], &mut fork_progresses[i], ); + let response = + ReplayStage::select_fork(&frozen_banks, &towers[i], &mut fork_progresses[i]); if response.is_none() { None } else { - let (_bank, stats) = response.unwrap(); - + let bank = response.unwrap(); + let stats = &fork_progresses[i].get(&bank.slot()).unwrap().fork_stats; Some(ForkSelectionResponse { slot: stats.slot, is_locked_out: stats.is_locked_out, @@ -1847,4 +1892,72 @@ pub(crate) mod tests { } Blockstore::destroy(&ledger_path).unwrap(); } + + #[test] + fn test_child_bank_heavier() { + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let node_pubkey = node_keypair.pubkey(); + let mut keypairs = HashMap::new(); + keypairs.insert( + node_pubkey, + ValidatorKeypairs::new(node_keypair, vote_keypair), + ); + + let (bank_forks, mut progress) = initialize_state(&keypairs); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + let mut tower = Tower::new_with_key(&node_pubkey); + + // Create the tree of banks in a BankForks object + let forks = tr(0) / (tr(1) / (tr(2) / (tr(3)))); + + let mut voting_simulator = VoteSimulator::new(&forks); + let mut cluster_votes: HashMap> = HashMap::new(); + let votes: Vec = vec![0, 2]; + for vote in &votes { + assert_eq!( + voting_simulator.simulate_vote( + *vote, + &bank_forks, + &mut cluster_votes, + &keypairs, + keypairs.get(&node_pubkey).unwrap(), + &mut progress, + &mut tower, + ), + VoteResult::Ok + ); + } + + let mut frozen_banks: Vec<_> = bank_forks + .read() + .unwrap() + .frozen_banks() + .values() + .cloned() + .collect(); + + ReplayStage::compute_bank_stats( + &Pubkey::default(), + &bank_forks.read().unwrap().ancestors(), + &mut frozen_banks, + &tower, + &mut progress, + ); + + frozen_banks.sort_by_key(|bank| bank.slot()); + for pair in frozen_banks.windows(2) { + let first = progress + .get(&pair[0].slot()) + .unwrap() + .fork_stats + .fork_weight; + let second = progress + .get(&pair[1].slot()) + .unwrap() + .fork_stats + .fork_weight; + assert!(second >= first); + } + } }