diff --git a/core/src/confidence.rs b/core/src/confidence.rs index ab72b65d0..6b46fe665 100644 --- a/core/src/confidence.rs +++ b/core/src/confidence.rs @@ -1,4 +1,10 @@ +use crate::consensus::StakeLockout; +use crate::service::Service; use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, RwLock}; +use std::thread::{self, Builder, JoinHandle}; #[derive(Debug, Default, PartialEq)] pub struct Confidence { @@ -82,6 +88,131 @@ impl ForkConfidenceCache { } } +pub struct ConfidenceAggregationData { + lockouts: HashMap, + root: Option, + ancestors: Arc>>, + total_staked: u64, +} + +impl ConfidenceAggregationData { + pub fn new( + lockouts: HashMap, + root: Option, + ancestors: Arc>>, + total_staked: u64, + ) -> Self { + Self { + lockouts, + root, + ancestors, + total_staked, + } + } +} + +pub struct AggregateConfidenceService { + t_confidence: JoinHandle<()>, +} + +impl AggregateConfidenceService { + pub fn aggregate_confidence( + root: Option, + ancestors: &HashMap>, + stake_lockouts: &HashMap, + ) -> HashMap { + let mut stake_weighted_lockouts: HashMap = HashMap::new(); + for (fork, lockout) in stake_lockouts.iter() { + if root.is_none() || *fork >= root.unwrap() { + let mut slot_with_ancestors = vec![*fork]; + slot_with_ancestors.extend(ancestors.get(&fork).unwrap_or(&HashSet::new())); + for slot in slot_with_ancestors { + if root.is_none() || slot >= root.unwrap() { + let entry = stake_weighted_lockouts.entry(slot).or_default(); + *entry += u128::from(lockout.lockout()) * u128::from(lockout.stake()); + } + } + } + } + stake_weighted_lockouts + } + + pub fn new( + exit: &Arc, + fork_confidence_cache: Arc>, + ) -> (Sender, Self) { + let (lockouts_sender, lockouts_receiver): ( + Sender, + Receiver, + ) = channel(); + let exit_ = exit.clone(); + ( + lockouts_sender, + Self { + t_confidence: Builder::new() + .name("solana-aggregate-stake-lockouts".to_string()) + .spawn(move || loop { + if exit_.load(Ordering::Relaxed) { + break; + } + if let Ok(aggregation_data) = lockouts_receiver.try_recv() { + let stake_weighted_lockouts = Self::aggregate_confidence( + aggregation_data.root, + &aggregation_data.ancestors, + &aggregation_data.lockouts, + ); + + let mut w_fork_confidence_cache = + fork_confidence_cache.write().unwrap(); + + // Cache the confidence values + for (fork, stake_lockout) in aggregation_data.lockouts.iter() { + if aggregation_data.root.is_none() + || *fork >= aggregation_data.root.unwrap() + { + w_fork_confidence_cache.cache_fork_confidence( + *fork, + stake_lockout.stake(), + aggregation_data.total_staked, + stake_lockout.lockout(), + ); + } + } + + // Cache the stake weighted lockouts + for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() { + if aggregation_data.root.is_none() + || *fork >= aggregation_data.root.unwrap() + { + w_fork_confidence_cache.cache_stake_weighted_lockouts( + *fork, + *stake_weighted_lockout, + ) + } + } + + if let Some(root) = aggregation_data.root { + w_fork_confidence_cache + .prune_confidence_cache(&aggregation_data.ancestors, root); + } + + drop(w_fork_confidence_cache); + } + }) + .unwrap(), + }, + ) + } +} + +impl Service for AggregateConfidenceService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.t_confidence.join() + } +} + #[cfg(test)] mod tests { use super::*; @@ -119,4 +250,30 @@ mod tests { 20, ); } + + #[test] + fn test_aggregate_confidence() { + let stakes = vec![ + (0, StakeLockout::new(1, 32)), + (1, StakeLockout::new(1, 24)), + (2, StakeLockout::new(1, 16)), + (3, StakeLockout::new(1, 8)), + ] + .into_iter() + .collect(); + let ancestors = vec![ + (0, HashSet::new()), + (1, vec![0].into_iter().collect()), + (2, vec![0, 1].into_iter().collect()), + (3, vec![0, 1, 2].into_iter().collect()), + ] + .into_iter() + .collect(); + let stake_weighted_lockouts = + AggregateConfidenceService::aggregate_confidence(Some(1), &ancestors, &stakes); + assert!(stake_weighted_lockouts.get(&0).is_none()); + assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24); + assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16); + assert_eq!(*stake_weighted_lockouts.get(&3).unwrap(), 8); + } } diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 52c616479..d5e738f74 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -18,6 +18,9 @@ pub struct StakeLockout { } impl StakeLockout { + pub fn new(lockout: u64, stake: u64) -> Self { + Self { lockout, stake } + } pub fn lockout(&self) -> u64 { self.lockout } @@ -303,27 +306,6 @@ impl Tower { } } - pub fn aggregate_stake_lockouts( - root: Option, - ancestors: &HashMap>, - stake_lockouts: &HashMap, - ) -> HashMap { - let mut stake_weighted_lockouts: HashMap = HashMap::new(); - for (fork, lockout) in stake_lockouts.iter() { - if root.is_none() || *fork >= root.unwrap() { - let mut slot_with_ancestors = vec![*fork]; - slot_with_ancestors.extend(ancestors.get(&fork).unwrap_or(&HashSet::new())); - for slot in slot_with_ancestors { - if root.is_none() || slot >= root.unwrap() { - let entry = stake_weighted_lockouts.entry(slot).or_default(); - *entry += u128::from(lockout.lockout) * u128::from(lockout.stake); - } - } - } - } - stake_weighted_lockouts - } - /// Update lockouts for all the ancestors fn update_ancestor_lockouts( stake_lockouts: &mut HashMap, @@ -535,59 +517,6 @@ mod test { assert!(tower.check_vote_stake_threshold(0, &stakes, 2)); } - #[test] - fn test_aggregate_stake_lockouts() { - let mut tower = Tower::new_for_tests(0, 0.67); - tower.lockouts.root_slot = Some(1); - let stakes = vec![ - ( - 0, - StakeLockout { - stake: 1, - lockout: 32, - }, - ), - ( - 1, - StakeLockout { - stake: 1, - lockout: 24, - }, - ), - ( - 2, - StakeLockout { - stake: 1, - lockout: 16, - }, - ), - ( - 3, - StakeLockout { - stake: 1, - lockout: 8, - }, - ), - ] - .into_iter() - .collect(); - - let ancestors = vec![ - (0, HashSet::new()), - (1, vec![0].into_iter().collect()), - (2, vec![0, 1].into_iter().collect()), - (3, vec![0, 1, 2].into_iter().collect()), - ] - .into_iter() - .collect(); - let stake_weighted_lockouts = - Tower::aggregate_stake_lockouts(tower.root(), &ancestors, &stakes); - assert!(stake_weighted_lockouts.get(&0).is_none()); - assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24); - assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16); - assert_eq!(*stake_weighted_lockouts.get(&3).unwrap(), 8); - } - #[test] fn test_is_slot_confirmed_not_enough_stake_failure() { let tower = Tower::new_for_tests(1, 0.67); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0a4ec7497..6818fe911 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4,7 +4,9 @@ use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, BlocktreeError}; use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; -use crate::confidence::ForkConfidenceCache; +use crate::confidence::{ + AggregateConfidenceService, ConfidenceAggregationData, ForkConfidenceCache, +}; use crate::consensus::{StakeLockout, Tower}; use crate::entry::{Entry, EntrySlice}; use crate::leader_schedule_cache::LeaderScheduleCache; @@ -54,7 +56,7 @@ impl Drop for Finalizer { pub struct ReplayStage { t_replay: JoinHandle>, - t_lockouts: JoinHandle<()>, + confidence_service: AggregateConfidenceService, } #[derive(Default)] @@ -113,7 +115,8 @@ impl ReplayStage { let vote_account = *vote_account; let voting_keypair = voting_keypair.cloned(); - let (lockouts_sender, t_lockouts) = aggregate_stake_lockouts(exit, fork_confidence_cache); + let (lockouts_sender, confidence_service) = + AggregateConfidenceService::new(exit, fork_confidence_cache); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) @@ -246,7 +249,7 @@ impl ReplayStage { ( Self { t_replay, - t_lockouts, + confidence_service, }, root_bank_receiver, ) @@ -414,7 +417,7 @@ impl ReplayStage { root_bank_sender: &Sender>>, lockouts: HashMap, total_staked: u64, - lockouts_sender: &Sender, + lockouts_sender: &Sender, snapshot_package_sender: &Option, ) -> Result<()> where @@ -477,14 +480,14 @@ impl ReplayStage { tower: &Tower, lockouts: HashMap, total_staked: u64, - lockouts_sender: &Sender, + lockouts_sender: &Sender, ) { - if let Err(e) = lockouts_sender.send(LockoutAggregationData { + if let Err(e) = lockouts_sender.send(ConfidenceAggregationData::new( lockouts, - root: tower.root(), - ancestors: ancestors.clone(), + tower.root(), + ancestors.clone(), total_staked, - }) { + )) { trace!("lockouts_sender failed: {:?}", e); } } @@ -789,80 +792,11 @@ impl Service for ReplayStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.t_lockouts.join()?; + self.confidence_service.join()?; self.t_replay.join().map(|_| ()) } } -struct LockoutAggregationData { - lockouts: HashMap, - root: Option, - ancestors: Arc>>, - total_staked: u64, -} - -fn aggregate_stake_lockouts( - exit: &Arc, - fork_confidence_cache: Arc>, -) -> (Sender, JoinHandle<()>) { - let (lockouts_sender, lockouts_receiver): ( - Sender, - Receiver, - ) = channel(); - let exit_ = exit.clone(); - ( - lockouts_sender, - Builder::new() - .name("solana-aggregate-stake-lockouts".to_string()) - .spawn(move || loop { - if exit_.load(Ordering::Relaxed) { - break; - } - if let Ok(aggregation_data) = lockouts_receiver.try_recv() { - let stake_weighted_lockouts = Tower::aggregate_stake_lockouts( - aggregation_data.root, - &aggregation_data.ancestors, - &aggregation_data.lockouts, - ); - - let mut w_fork_confidence_cache = fork_confidence_cache.write().unwrap(); - - // Cache the confidence values - for (fork, stake_lockout) in aggregation_data.lockouts.iter() { - if aggregation_data.root.is_none() - || *fork >= aggregation_data.root.unwrap() - { - w_fork_confidence_cache.cache_fork_confidence( - *fork, - stake_lockout.stake(), - aggregation_data.total_staked, - stake_lockout.lockout(), - ); - } - } - - // Cache the stake weighted lockouts - for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() { - if aggregation_data.root.is_none() - || *fork >= aggregation_data.root.unwrap() - { - w_fork_confidence_cache - .cache_stake_weighted_lockouts(*fork, *stake_weighted_lockout) - } - } - - if let Some(root) = aggregation_data.root { - w_fork_confidence_cache - .prune_confidence_cache(&aggregation_data.ancestors, root); - } - - drop(w_fork_confidence_cache); - } - }) - .unwrap(), - ) -} - #[cfg(test)] mod test { use super::*; @@ -1064,7 +998,7 @@ mod test { } let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default())); - let (lockouts_sender, _) = aggregate_stake_lockouts( + let (lockouts_sender, _) = AggregateConfidenceService::new( &Arc::new(AtomicBool::new(false)), fork_confidence_cache.clone(), );