diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index bb6c9f3da9..d538454733 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -20,6 +20,39 @@ pub struct BankForks { root: u64, slots: HashSet, snapshot_path: Option, + confidence: HashMap, +} + +#[derive(Debug, Default, PartialEq)] +pub struct Confidence { + fork_stakes: u64, + epoch_stakes: u64, + lockouts: u64, + stake_weighted_lockouts: u128, +} + +impl Confidence { + pub fn new(fork_stakes: u64, epoch_stakes: u64, lockouts: u64) -> Self { + Self { + fork_stakes, + epoch_stakes, + lockouts, + stake_weighted_lockouts: 0, + } + } + pub fn new_with_stake_weighted( + fork_stakes: u64, + epoch_stakes: u64, + lockouts: u64, + stake_weighted_lockouts: u128, + ) -> Self { + Self { + fork_stakes, + epoch_stakes, + lockouts, + stake_weighted_lockouts, + } + } } impl Index for BankForks { @@ -40,6 +73,7 @@ impl BankForks { root: 0, slots: HashSet::new(), snapshot_path: None, + confidence: HashMap::new(), } } @@ -104,6 +138,7 @@ impl BankForks { working_bank, slots: HashSet::new(), snapshot_path: None, + confidence: HashMap::new(), } } @@ -161,6 +196,8 @@ impl BankForks { let descendants = self.descendants(); self.banks .retain(|slot, _| descendants[&root].contains(slot)); + self.confidence + .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); if self.snapshot_path.is_some() { let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect(); trace!("prune non root {} - {:?}", root, diff); @@ -175,6 +212,41 @@ impl BankForks { self.slots = slots.clone(); } + pub fn cache_fork_confidence( + &mut self, + fork: u64, + fork_stakes: u64, + epoch_stakes: u64, + lockouts: u64, + ) { + self.confidence + .entry(fork) + .and_modify(|entry| { + entry.fork_stakes = fork_stakes; + entry.epoch_stakes = epoch_stakes; + entry.lockouts = lockouts; + }) + .or_insert_with(|| Confidence::new(fork_stakes, epoch_stakes, lockouts)); + } + + pub fn cache_stake_weighted_lockouts(&mut self, fork: u64, stake_weighted_lockouts: u128) { + self.confidence + .entry(fork) + .and_modify(|entry| { + entry.stake_weighted_lockouts = stake_weighted_lockouts; + }) + .or_insert(Confidence { + fork_stakes: 0, + epoch_stakes: 0, + lockouts: 0, + stake_weighted_lockouts, + }); + } + + pub fn get_fork_confidence(&self, fork: u64) -> Option<&Confidence> { + self.confidence.get(&fork) + } + fn get_io_error(error: &str) -> Error { warn!("BankForks error: {:?}", error); Error::new(ErrorKind::Other, error) @@ -356,6 +428,7 @@ impl BankForks { root, slots, snapshot_path: snapshot_path.clone(), + confidence: HashMap::new(), }) } } @@ -439,6 +512,46 @@ mod tests { assert_eq!(bank_forks.active_banks(), vec![1]); } + #[test] + fn test_bank_forks_confidence_cache() { + let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000); + let bank = Bank::new(&genesis_block); + let fork = bank.slot(); + let mut bank_forks = BankForks::new(0, bank); + assert!(bank_forks.confidence.get(&fork).is_none()); + bank_forks.cache_fork_confidence(fork, 11, 12, 13); + assert_eq!( + bank_forks.confidence.get(&fork).unwrap(), + &Confidence { + fork_stakes: 11, + epoch_stakes: 12, + lockouts: 13, + stake_weighted_lockouts: 0, + } + ); + // Ensure that {fork_stakes, epoch_stakes, lockouts} and stake_weighted_lockouts + // can be updated separately + bank_forks.cache_stake_weighted_lockouts(fork, 20); + assert_eq!( + bank_forks.confidence.get(&fork).unwrap(), + &Confidence { + fork_stakes: 11, + epoch_stakes: 12, + lockouts: 13, + stake_weighted_lockouts: 20, + } + ); + bank_forks.cache_fork_confidence(fork, 21, 22, 23); + assert_eq!( + bank_forks + .confidence + .get(&fork) + .unwrap() + .stake_weighted_lockouts, + 20, + ); + } + struct TempPaths { pub paths: String, } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 68dcb9366f..fef6af8393 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -28,6 +28,15 @@ pub struct StakeLockout { stake: u64, } +impl StakeLockout { + pub fn lockout(&self) -> u64 { + self.lockout + } + pub fn stake(&self) -> u64 { + self.stake + } +} + #[derive(Default)] pub struct Tower { epoch_stakes: EpochStakes, @@ -274,6 +283,10 @@ impl Tower { self.lockouts.root_slot } + pub fn total_epoch_stakes(&self) -> u64 { + self.epoch_stakes.total_staked + } + pub fn calculate_weight(&self, stake_lockouts: &HashMap) -> u128 { let mut sum = 0u128; let root_slot = self.lockouts.root_slot.unwrap_or(0); @@ -333,6 +346,27 @@ impl Tower { } } + pub fn aggregate_stake_lockouts( + root: Option, + ancestors: &HashMap>, + stake_lockouts: HashMap, + ) -> HashMap { + let mut stake_weighted_lockouts: HashMap = HashMap::new(); + for (fork, lockout) in stake_lockouts.iter() { + if root.is_none() || *fork >= root.unwrap() { + let mut slot_with_ancestors = vec![*fork]; + slot_with_ancestors.extend(ancestors.get(&fork).unwrap_or(&HashSet::new())); + for slot in slot_with_ancestors { + if root.is_none() || slot >= root.unwrap() { + let entry = stake_weighted_lockouts.entry(slot).or_default(); + *entry += u128::from(lockout.lockout) * u128::from(lockout.stake); + } + } + } + } + stake_weighted_lockouts + } + /// Update lockouts for all the ancestors fn update_ancestor_lockouts( stake_lockouts: &mut HashMap, @@ -434,6 +468,7 @@ mod test { .collect(); let staked_lockouts = tower.collect_vote_lockouts(1, accounts.into_iter(), &ancestors); assert!(staked_lockouts.is_empty()); + assert_eq!(tower.epoch_stakes.total_staked, 2); } #[test] @@ -448,6 +483,7 @@ mod test { let staked_lockouts = tower.collect_vote_lockouts(1, accounts.into_iter(), &ancestors); assert_eq!(staked_lockouts[&0].stake, 2); assert_eq!(staked_lockouts[&0].lockout, 2 + 2 + 4 + 4); + assert_eq!(tower.epoch_stakes.total_staked, 2); } #[test] @@ -530,6 +566,59 @@ mod test { assert!(tower.check_vote_stake_threshold(0, &stakes)); } + #[test] + fn test_aggregate_stake_lockouts() { + let mut tower = Tower::new(EpochStakes::new_for_tests(2), 0, 0.67); + tower.lockouts.root_slot = Some(1); + let stakes = vec![ + ( + 0, + StakeLockout { + stake: 1, + lockout: 32, + }, + ), + ( + 1, + StakeLockout { + stake: 1, + lockout: 24, + }, + ), + ( + 2, + StakeLockout { + stake: 1, + lockout: 16, + }, + ), + ( + 3, + StakeLockout { + stake: 1, + lockout: 8, + }, + ), + ] + .into_iter() + .collect(); + + let ancestors = vec![ + (0, HashSet::new()), + (1, vec![0].into_iter().collect()), + (2, vec![0, 1].into_iter().collect()), + (3, vec![0, 1, 2].into_iter().collect()), + ] + .into_iter() + .collect(); + let stake_weighted_lockouts = + Tower::aggregate_stake_lockouts(tower.root(), &ancestors, stakes); + assert!(stake_weighted_lockouts.get(&0).is_none()); + assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24); + assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16); + assert_eq!(*stake_weighted_lockouts.get(&3).unwrap(), 8); + } + #[test] fn test_is_slot_confirmed_not_enough_stake_failure() { let tower = Tower::new(EpochStakes::new_for_tests(2), 1, 0.67); @@ -800,17 +889,17 @@ mod test { for vote in &tower_votes { tower.record_vote(*vote, Hash::default()); } - let stakes_lockouts = + let staked_lockouts = tower.collect_vote_lockouts(vote_to_evaluate, accounts.clone().into_iter(), &ancestors); - assert!(tower.check_vote_stake_threshold(vote_to_evaluate, &stakes_lockouts)); + assert!(tower.check_vote_stake_threshold(vote_to_evaluate, &staked_lockouts)); // CASE 2: Now we want to evaluate a vote for slot VOTE_THRESHOLD_DEPTH + 1. This slot // will expire the vote in one of the vote accounts, so we should have insufficient // stake to pass the threshold let vote_to_evaluate = VOTE_THRESHOLD_DEPTH as u64 + 1; - let stakes_lockouts = + let staked_lockouts = tower.collect_vote_lockouts(vote_to_evaluate, accounts.into_iter(), &ancestors); - assert!(!tower.check_vote_stake_threshold(vote_to_evaluate, &stakes_lockouts)); + assert!(!tower.check_vote_stake_threshold(vote_to_evaluate, &staked_lockouts)); } fn vote_and_check_recent(num_votes: usize) { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 23cd8f97c7..d000286eb9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -50,6 +50,7 @@ impl Drop for Finalizer { pub struct ReplayStage { t_replay: JoinHandle>, + t_lockouts: JoinHandle<()>, } #[derive(Default)] @@ -106,6 +107,8 @@ impl ReplayStage { let vote_account = *vote_account; let voting_keypair = voting_keypair.cloned(); + let (lockouts_sender, t_lockouts) = aggregate_stake_lockouts(exit); + let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { @@ -138,7 +141,7 @@ impl ReplayStage { let votable = Self::generate_votable_banks(&bank_forks, &tower, &mut progress); - if let Some((_, bank)) = votable.last() { + if let Some((_, bank, lockouts)) = votable.into_iter().last() { subscriptions.notify_subscribers(bank.slot(), &bank_forks); if let Some(new_leader) = @@ -163,6 +166,8 @@ impl ReplayStage { &blocktree, &leader_schedule_cache, &root_bank_sender, + lockouts, + &lockouts_sender, )?; Self::reset_poh_recorder( @@ -212,7 +217,13 @@ impl ReplayStage { Ok(()) }) .unwrap(); - (Self { t_replay }, root_bank_receiver) + ( + Self { + t_replay, + t_lockouts, + }, + root_bank_receiver, + ) } fn log_leader_change( @@ -369,6 +380,8 @@ impl ReplayStage { blocktree: &Arc, leader_schedule_cache: &Arc, root_bank_sender: &Sender>>, + lockouts: HashMap, + lockouts_sender: &Sender, ) -> Result<()> where T: 'static + KeypairUtil + Send + Sync, @@ -400,6 +413,7 @@ impl ReplayStage { Err(e)?; } } + Self::update_confidence_cache(bank_forks, tower, lockouts, lockouts_sender); tower.update_epoch(&bank); if let Some(ref voting_keypair) = voting_keypair { let node_keypair = cluster_info.read().unwrap().keypair.clone(); @@ -422,6 +436,33 @@ impl ReplayStage { Ok(()) } + fn update_confidence_cache( + bank_forks: &Arc>, + tower: &Tower, + lockouts: HashMap, + lockouts_sender: &Sender, + ) { + let total_epoch_stakes = tower.total_epoch_stakes(); + let mut w_bank_forks = bank_forks.write().unwrap(); + for (fork, stake_lockout) in lockouts.iter() { + if tower.root().is_none() || *fork >= tower.root().unwrap() { + w_bank_forks.cache_fork_confidence( + *fork, + stake_lockout.stake(), + total_epoch_stakes, + stake_lockout.lockout(), + ); + } + } + drop(w_bank_forks); + let bank_forks_clone = bank_forks.clone(); + let root = tower.root(); + + if let Err(e) = lockouts_sender.send((lockouts, root, bank_forks_clone)) { + trace!("lockouts_sender failed: {:?}", e); + } + } + fn reset_poh_recorder( my_pubkey: &Pubkey, blocktree: &Blocktree, @@ -498,7 +539,7 @@ impl ReplayStage { bank_forks: &Arc>, tower: &Tower, progress: &mut HashMap, - ) -> Vec<(u128, Arc)> { + ) -> Vec<(u128, Arc, HashMap)> { let tower_start = Instant::now(); // Tower voting let descendants = bank_forks.read().unwrap().descendants(); @@ -506,7 +547,7 @@ impl ReplayStage { let frozen_banks = bank_forks.read().unwrap().frozen_banks(); trace!("frozen_banks {}", frozen_banks.len()); - let mut votable: Vec<(u128, Arc)> = frozen_banks + let mut votable: Vec<(u128, Arc, HashMap)> = frozen_banks .values() .filter(|b| { let is_votable = b.is_votable(); @@ -544,7 +585,13 @@ impl ReplayStage { debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold); vote_threshold }) - .map(|(b, stake_lockouts)| (tower.calculate_weight(&stake_lockouts), b.clone())) + .map(|(b, stake_lockouts)| { + ( + tower.calculate_weight(&stake_lockouts), + b.clone(), + stake_lockouts, + ) + }) .collect(); votable.sort_by_key(|b| b.0); @@ -716,17 +763,59 @@ impl Service for ReplayStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { + self.t_lockouts.join()?; self.t_replay.join().map(|_| ()) } } +type LockoutAggregationData = ( + HashMap, // lockouts + Option, // root + Arc>, // bank_forks +); + +fn aggregate_stake_lockouts( + exit: &Arc, +) -> (Sender, JoinHandle<()>) { + let (lockouts_sender, lockouts_receiver): ( + Sender, + Receiver, + ) = channel(); + let exit_ = exit.clone(); + ( + lockouts_sender, + Builder::new() + .name("solana-aggregate-stake-lockouts".to_string()) + .spawn(move || loop { + if exit_.load(Ordering::Relaxed) { + break; + } + if let Ok((lockouts, root, bank_forks)) = lockouts_receiver.try_recv() { + let ancestors = bank_forks.read().unwrap().ancestors(); + let stake_weighted_lockouts = + Tower::aggregate_stake_lockouts(root, &ancestors, lockouts); + let mut w_bank_forks = bank_forks.write().unwrap(); + for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() { + if root.is_none() || *fork >= root.unwrap() { + w_bank_forks + .cache_stake_weighted_lockouts(*fork, *stake_weighted_lockout) + } + } + drop(w_bank_forks); + } + }) + .unwrap(), + ) +} + #[cfg(test)] mod test { use super::*; + use crate::bank_forks::Confidence; use crate::blocktree::get_tmp_ledger_path; use crate::entry; use crate::erasure::ErasureConfig; - use crate::genesis_utils::create_genesis_block; + use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader}; use crate::packet::{Blob, BLOB_HEADER_SIZE}; use crate::replay_stage::ReplayStage; use solana_runtime::genesis_utils::GenesisBlockInfo; @@ -734,6 +823,7 @@ mod test { use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; use solana_sdk::transaction::TransactionError; + use solana_vote_api::vote_state::VoteState; use std::fs::remove_dir_all; use std::sync::{Arc, RwLock}; @@ -914,4 +1004,97 @@ mod test { let _ignored = remove_dir_all(&ledger_path); res } + + #[test] + fn test_replay_confidence_cache() { + fn leader_vote(bank: &Arc, pubkey: &Pubkey) { + let mut leader_vote_account = bank.get_account(&pubkey).unwrap(); + let mut vote_state = VoteState::from(&leader_vote_account).unwrap(); + vote_state.process_slot_vote_unchecked(bank.slot()); + vote_state.to(&mut leader_vote_account).unwrap(); + bank.store_account(&pubkey, &leader_vote_account); + } + + let (lockouts_sender, _) = aggregate_stake_lockouts(&Arc::new(AtomicBool::new(false))); + + let leader_pubkey = Pubkey::new_rand(); + let leader_lamports = 3; + let genesis_block_info = + create_genesis_block_with_leader(50, &leader_pubkey, leader_lamports); + let mut genesis_block = genesis_block_info.genesis_block; + let leader_voting_pubkey = genesis_block_info.voting_keypair.pubkey(); + genesis_block.epoch_warmup = false; + genesis_block.ticks_per_slot = 4; + let bank0 = Bank::new(&genesis_block); + for _ in 1..genesis_block.ticks_per_slot { + bank0.register_tick(&Hash::default()); + } + bank0.freeze(); + let arc_bank0 = Arc::new(bank0); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( + &[arc_bank0.clone()], + 0, + ))); + let pubkey = Pubkey::new_rand(); + let mut tower = Tower::new_from_forks(&bank_forks.read().unwrap(), &pubkey); + let mut progress = HashMap::new(); + + leader_vote(&arc_bank0, &leader_voting_pubkey); + let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); + if let Some((_, _, lockouts)) = votable.into_iter().last() { + ReplayStage::update_confidence_cache(&bank_forks, &tower, lockouts, &lockouts_sender); + } + + assert_eq!( + bank_forks.read().unwrap().get_fork_confidence(0).unwrap(), + &Confidence::new(0, 1, 2) + ); + assert!(bank_forks.read().unwrap().get_fork_confidence(1).is_none()); + + tower.record_vote(arc_bank0.slot(), arc_bank0.hash()); + + let bank1 = Bank::new_from_parent(&arc_bank0, &Pubkey::default(), arc_bank0.slot() + 1); + let _res = bank1.transfer(10, &genesis_block_info.mint_keypair, &Pubkey::new_rand()); + for _ in 0..genesis_block.ticks_per_slot { + bank1.register_tick(&Hash::default()); + } + bank1.freeze(); + bank_forks.write().unwrap().insert(bank1); + let arc_bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); + leader_vote(&arc_bank1, &leader_voting_pubkey); + let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); + if let Some((_, _, lockouts)) = votable.into_iter().last() { + ReplayStage::update_confidence_cache(&bank_forks, &tower, lockouts, &lockouts_sender); + } + + tower.record_vote(arc_bank1.slot(), arc_bank1.hash()); + + let bank2 = Bank::new_from_parent(&arc_bank1, &Pubkey::default(), arc_bank1.slot() + 1); + let _res = bank2.transfer(10, &genesis_block_info.mint_keypair, &Pubkey::new_rand()); + for _ in 0..genesis_block.ticks_per_slot { + bank2.register_tick(&Hash::default()); + } + bank2.freeze(); + bank_forks.write().unwrap().insert(bank2); + let arc_bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); + leader_vote(&arc_bank2, &leader_voting_pubkey); + let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); + if let Some((_, _, lockouts)) = votable.into_iter().last() { + ReplayStage::update_confidence_cache(&bank_forks, &tower, lockouts, &lockouts_sender); + } + thread::sleep(Duration::from_millis(200)); + + assert_eq!( + bank_forks.read().unwrap().get_fork_confidence(0).unwrap(), + &Confidence::new_with_stake_weighted(1, 1, 14, 20) + ); + assert_eq!( + bank_forks.read().unwrap().get_fork_confidence(1).unwrap(), + &Confidence::new_with_stake_weighted(1, 1, 6, 6) + ); + assert_eq!( + bank_forks.read().unwrap().get_fork_confidence(2).unwrap(), + &Confidence::new_with_stake_weighted(0, 1, 2, 0) + ); + } }