Refactor confidence from replay stage (#5938)
This commit is contained in:
parent
268beb3489
commit
2f50d0e145
|
@ -1,4 +1,10 @@
|
||||||
|
use crate::consensus::StakeLockout;
|
||||||
|
use crate::service::Service;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
|
|
||||||
#[derive(Debug, Default, PartialEq)]
|
#[derive(Debug, Default, PartialEq)]
|
||||||
pub struct Confidence {
|
pub struct Confidence {
|
||||||
|
@ -82,6 +88,131 @@ impl ForkConfidenceCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct ConfidenceAggregationData {
|
||||||
|
lockouts: HashMap<u64, StakeLockout>,
|
||||||
|
root: Option<u64>,
|
||||||
|
ancestors: Arc<HashMap<u64, HashSet<u64>>>,
|
||||||
|
total_staked: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfidenceAggregationData {
|
||||||
|
pub fn new(
|
||||||
|
lockouts: HashMap<u64, StakeLockout>,
|
||||||
|
root: Option<u64>,
|
||||||
|
ancestors: Arc<HashMap<u64, HashSet<u64>>>,
|
||||||
|
total_staked: u64,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
lockouts,
|
||||||
|
root,
|
||||||
|
ancestors,
|
||||||
|
total_staked,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct AggregateConfidenceService {
|
||||||
|
t_confidence: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AggregateConfidenceService {
|
||||||
|
pub fn aggregate_confidence(
|
||||||
|
root: Option<u64>,
|
||||||
|
ancestors: &HashMap<u64, HashSet<u64>>,
|
||||||
|
stake_lockouts: &HashMap<u64, StakeLockout>,
|
||||||
|
) -> HashMap<u64, u128> {
|
||||||
|
let mut stake_weighted_lockouts: HashMap<u64, u128> = HashMap::new();
|
||||||
|
for (fork, lockout) in stake_lockouts.iter() {
|
||||||
|
if root.is_none() || *fork >= root.unwrap() {
|
||||||
|
let mut slot_with_ancestors = vec![*fork];
|
||||||
|
slot_with_ancestors.extend(ancestors.get(&fork).unwrap_or(&HashSet::new()));
|
||||||
|
for slot in slot_with_ancestors {
|
||||||
|
if root.is_none() || slot >= root.unwrap() {
|
||||||
|
let entry = stake_weighted_lockouts.entry(slot).or_default();
|
||||||
|
*entry += u128::from(lockout.lockout()) * u128::from(lockout.stake());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stake_weighted_lockouts
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new(
|
||||||
|
exit: &Arc<AtomicBool>,
|
||||||
|
fork_confidence_cache: Arc<RwLock<ForkConfidenceCache>>,
|
||||||
|
) -> (Sender<ConfidenceAggregationData>, Self) {
|
||||||
|
let (lockouts_sender, lockouts_receiver): (
|
||||||
|
Sender<ConfidenceAggregationData>,
|
||||||
|
Receiver<ConfidenceAggregationData>,
|
||||||
|
) = channel();
|
||||||
|
let exit_ = exit.clone();
|
||||||
|
(
|
||||||
|
lockouts_sender,
|
||||||
|
Self {
|
||||||
|
t_confidence: Builder::new()
|
||||||
|
.name("solana-aggregate-stake-lockouts".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
if exit_.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if let Ok(aggregation_data) = lockouts_receiver.try_recv() {
|
||||||
|
let stake_weighted_lockouts = Self::aggregate_confidence(
|
||||||
|
aggregation_data.root,
|
||||||
|
&aggregation_data.ancestors,
|
||||||
|
&aggregation_data.lockouts,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut w_fork_confidence_cache =
|
||||||
|
fork_confidence_cache.write().unwrap();
|
||||||
|
|
||||||
|
// Cache the confidence values
|
||||||
|
for (fork, stake_lockout) in aggregation_data.lockouts.iter() {
|
||||||
|
if aggregation_data.root.is_none()
|
||||||
|
|| *fork >= aggregation_data.root.unwrap()
|
||||||
|
{
|
||||||
|
w_fork_confidence_cache.cache_fork_confidence(
|
||||||
|
*fork,
|
||||||
|
stake_lockout.stake(),
|
||||||
|
aggregation_data.total_staked,
|
||||||
|
stake_lockout.lockout(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache the stake weighted lockouts
|
||||||
|
for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() {
|
||||||
|
if aggregation_data.root.is_none()
|
||||||
|
|| *fork >= aggregation_data.root.unwrap()
|
||||||
|
{
|
||||||
|
w_fork_confidence_cache.cache_stake_weighted_lockouts(
|
||||||
|
*fork,
|
||||||
|
*stake_weighted_lockout,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(root) = aggregation_data.root {
|
||||||
|
w_fork_confidence_cache
|
||||||
|
.prune_confidence_cache(&aggregation_data.ancestors, root);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(w_fork_confidence_cache);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service for AggregateConfidenceService {
|
||||||
|
type JoinReturnType = ();
|
||||||
|
|
||||||
|
fn join(self) -> thread::Result<()> {
|
||||||
|
self.t_confidence.join()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -119,4 +250,30 @@ mod tests {
|
||||||
20,
|
20,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_aggregate_confidence() {
|
||||||
|
let stakes = vec![
|
||||||
|
(0, StakeLockout::new(1, 32)),
|
||||||
|
(1, StakeLockout::new(1, 24)),
|
||||||
|
(2, StakeLockout::new(1, 16)),
|
||||||
|
(3, StakeLockout::new(1, 8)),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
let ancestors = vec![
|
||||||
|
(0, HashSet::new()),
|
||||||
|
(1, vec![0].into_iter().collect()),
|
||||||
|
(2, vec![0, 1].into_iter().collect()),
|
||||||
|
(3, vec![0, 1, 2].into_iter().collect()),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
let stake_weighted_lockouts =
|
||||||
|
AggregateConfidenceService::aggregate_confidence(Some(1), &ancestors, &stakes);
|
||||||
|
assert!(stake_weighted_lockouts.get(&0).is_none());
|
||||||
|
assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24);
|
||||||
|
assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16);
|
||||||
|
assert_eq!(*stake_weighted_lockouts.get(&3).unwrap(), 8);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,9 @@ pub struct StakeLockout {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StakeLockout {
|
impl StakeLockout {
|
||||||
|
pub fn new(lockout: u64, stake: u64) -> Self {
|
||||||
|
Self { lockout, stake }
|
||||||
|
}
|
||||||
pub fn lockout(&self) -> u64 {
|
pub fn lockout(&self) -> u64 {
|
||||||
self.lockout
|
self.lockout
|
||||||
}
|
}
|
||||||
|
@ -303,27 +306,6 @@ impl Tower {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn aggregate_stake_lockouts(
|
|
||||||
root: Option<u64>,
|
|
||||||
ancestors: &HashMap<u64, HashSet<u64>>,
|
|
||||||
stake_lockouts: &HashMap<u64, StakeLockout>,
|
|
||||||
) -> HashMap<u64, u128> {
|
|
||||||
let mut stake_weighted_lockouts: HashMap<u64, u128> = HashMap::new();
|
|
||||||
for (fork, lockout) in stake_lockouts.iter() {
|
|
||||||
if root.is_none() || *fork >= root.unwrap() {
|
|
||||||
let mut slot_with_ancestors = vec![*fork];
|
|
||||||
slot_with_ancestors.extend(ancestors.get(&fork).unwrap_or(&HashSet::new()));
|
|
||||||
for slot in slot_with_ancestors {
|
|
||||||
if root.is_none() || slot >= root.unwrap() {
|
|
||||||
let entry = stake_weighted_lockouts.entry(slot).or_default();
|
|
||||||
*entry += u128::from(lockout.lockout) * u128::from(lockout.stake);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
stake_weighted_lockouts
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update lockouts for all the ancestors
|
/// Update lockouts for all the ancestors
|
||||||
fn update_ancestor_lockouts(
|
fn update_ancestor_lockouts(
|
||||||
stake_lockouts: &mut HashMap<u64, StakeLockout>,
|
stake_lockouts: &mut HashMap<u64, StakeLockout>,
|
||||||
|
@ -535,59 +517,6 @@ mod test {
|
||||||
assert!(tower.check_vote_stake_threshold(0, &stakes, 2));
|
assert!(tower.check_vote_stake_threshold(0, &stakes, 2));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_aggregate_stake_lockouts() {
|
|
||||||
let mut tower = Tower::new_for_tests(0, 0.67);
|
|
||||||
tower.lockouts.root_slot = Some(1);
|
|
||||||
let stakes = vec![
|
|
||||||
(
|
|
||||||
0,
|
|
||||||
StakeLockout {
|
|
||||||
stake: 1,
|
|
||||||
lockout: 32,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
(
|
|
||||||
1,
|
|
||||||
StakeLockout {
|
|
||||||
stake: 1,
|
|
||||||
lockout: 24,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
(
|
|
||||||
2,
|
|
||||||
StakeLockout {
|
|
||||||
stake: 1,
|
|
||||||
lockout: 16,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
(
|
|
||||||
3,
|
|
||||||
StakeLockout {
|
|
||||||
stake: 1,
|
|
||||||
lockout: 8,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
]
|
|
||||||
.into_iter()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let ancestors = vec![
|
|
||||||
(0, HashSet::new()),
|
|
||||||
(1, vec![0].into_iter().collect()),
|
|
||||||
(2, vec![0, 1].into_iter().collect()),
|
|
||||||
(3, vec![0, 1, 2].into_iter().collect()),
|
|
||||||
]
|
|
||||||
.into_iter()
|
|
||||||
.collect();
|
|
||||||
let stake_weighted_lockouts =
|
|
||||||
Tower::aggregate_stake_lockouts(tower.root(), &ancestors, &stakes);
|
|
||||||
assert!(stake_weighted_lockouts.get(&0).is_none());
|
|
||||||
assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24);
|
|
||||||
assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16);
|
|
||||||
assert_eq!(*stake_weighted_lockouts.get(&3).unwrap(), 8);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_is_slot_confirmed_not_enough_stake_failure() {
|
fn test_is_slot_confirmed_not_enough_stake_failure() {
|
||||||
let tower = Tower::new_for_tests(1, 0.67);
|
let tower = Tower::new_for_tests(1, 0.67);
|
||||||
|
|
|
@ -4,7 +4,9 @@ use crate::bank_forks::BankForks;
|
||||||
use crate::blocktree::{Blocktree, BlocktreeError};
|
use crate::blocktree::{Blocktree, BlocktreeError};
|
||||||
use crate::blocktree_processor;
|
use crate::blocktree_processor;
|
||||||
use crate::cluster_info::ClusterInfo;
|
use crate::cluster_info::ClusterInfo;
|
||||||
use crate::confidence::ForkConfidenceCache;
|
use crate::confidence::{
|
||||||
|
AggregateConfidenceService, ConfidenceAggregationData, ForkConfidenceCache,
|
||||||
|
};
|
||||||
use crate::consensus::{StakeLockout, Tower};
|
use crate::consensus::{StakeLockout, Tower};
|
||||||
use crate::entry::{Entry, EntrySlice};
|
use crate::entry::{Entry, EntrySlice};
|
||||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||||
|
@ -54,7 +56,7 @@ impl Drop for Finalizer {
|
||||||
|
|
||||||
pub struct ReplayStage {
|
pub struct ReplayStage {
|
||||||
t_replay: JoinHandle<Result<()>>,
|
t_replay: JoinHandle<Result<()>>,
|
||||||
t_lockouts: JoinHandle<()>,
|
confidence_service: AggregateConfidenceService,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
|
@ -113,7 +115,8 @@ impl ReplayStage {
|
||||||
let vote_account = *vote_account;
|
let vote_account = *vote_account;
|
||||||
let voting_keypair = voting_keypair.cloned();
|
let voting_keypair = voting_keypair.cloned();
|
||||||
|
|
||||||
let (lockouts_sender, t_lockouts) = aggregate_stake_lockouts(exit, fork_confidence_cache);
|
let (lockouts_sender, confidence_service) =
|
||||||
|
AggregateConfidenceService::new(exit, fork_confidence_cache);
|
||||||
|
|
||||||
let t_replay = Builder::new()
|
let t_replay = Builder::new()
|
||||||
.name("solana-replay-stage".to_string())
|
.name("solana-replay-stage".to_string())
|
||||||
|
@ -246,7 +249,7 @@ impl ReplayStage {
|
||||||
(
|
(
|
||||||
Self {
|
Self {
|
||||||
t_replay,
|
t_replay,
|
||||||
t_lockouts,
|
confidence_service,
|
||||||
},
|
},
|
||||||
root_bank_receiver,
|
root_bank_receiver,
|
||||||
)
|
)
|
||||||
|
@ -414,7 +417,7 @@ impl ReplayStage {
|
||||||
root_bank_sender: &Sender<Vec<Arc<Bank>>>,
|
root_bank_sender: &Sender<Vec<Arc<Bank>>>,
|
||||||
lockouts: HashMap<u64, StakeLockout>,
|
lockouts: HashMap<u64, StakeLockout>,
|
||||||
total_staked: u64,
|
total_staked: u64,
|
||||||
lockouts_sender: &Sender<LockoutAggregationData>,
|
lockouts_sender: &Sender<ConfidenceAggregationData>,
|
||||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
|
@ -477,14 +480,14 @@ impl ReplayStage {
|
||||||
tower: &Tower,
|
tower: &Tower,
|
||||||
lockouts: HashMap<u64, StakeLockout>,
|
lockouts: HashMap<u64, StakeLockout>,
|
||||||
total_staked: u64,
|
total_staked: u64,
|
||||||
lockouts_sender: &Sender<LockoutAggregationData>,
|
lockouts_sender: &Sender<ConfidenceAggregationData>,
|
||||||
) {
|
) {
|
||||||
if let Err(e) = lockouts_sender.send(LockoutAggregationData {
|
if let Err(e) = lockouts_sender.send(ConfidenceAggregationData::new(
|
||||||
lockouts,
|
lockouts,
|
||||||
root: tower.root(),
|
tower.root(),
|
||||||
ancestors: ancestors.clone(),
|
ancestors.clone(),
|
||||||
total_staked,
|
total_staked,
|
||||||
}) {
|
)) {
|
||||||
trace!("lockouts_sender failed: {:?}", e);
|
trace!("lockouts_sender failed: {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -789,80 +792,11 @@ impl Service for ReplayStage {
|
||||||
type JoinReturnType = ();
|
type JoinReturnType = ();
|
||||||
|
|
||||||
fn join(self) -> thread::Result<()> {
|
fn join(self) -> thread::Result<()> {
|
||||||
self.t_lockouts.join()?;
|
self.confidence_service.join()?;
|
||||||
self.t_replay.join().map(|_| ())
|
self.t_replay.join().map(|_| ())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LockoutAggregationData {
|
|
||||||
lockouts: HashMap<u64, StakeLockout>,
|
|
||||||
root: Option<u64>,
|
|
||||||
ancestors: Arc<HashMap<u64, HashSet<u64>>>,
|
|
||||||
total_staked: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn aggregate_stake_lockouts(
|
|
||||||
exit: &Arc<AtomicBool>,
|
|
||||||
fork_confidence_cache: Arc<RwLock<ForkConfidenceCache>>,
|
|
||||||
) -> (Sender<LockoutAggregationData>, JoinHandle<()>) {
|
|
||||||
let (lockouts_sender, lockouts_receiver): (
|
|
||||||
Sender<LockoutAggregationData>,
|
|
||||||
Receiver<LockoutAggregationData>,
|
|
||||||
) = channel();
|
|
||||||
let exit_ = exit.clone();
|
|
||||||
(
|
|
||||||
lockouts_sender,
|
|
||||||
Builder::new()
|
|
||||||
.name("solana-aggregate-stake-lockouts".to_string())
|
|
||||||
.spawn(move || loop {
|
|
||||||
if exit_.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if let Ok(aggregation_data) = lockouts_receiver.try_recv() {
|
|
||||||
let stake_weighted_lockouts = Tower::aggregate_stake_lockouts(
|
|
||||||
aggregation_data.root,
|
|
||||||
&aggregation_data.ancestors,
|
|
||||||
&aggregation_data.lockouts,
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut w_fork_confidence_cache = fork_confidence_cache.write().unwrap();
|
|
||||||
|
|
||||||
// Cache the confidence values
|
|
||||||
for (fork, stake_lockout) in aggregation_data.lockouts.iter() {
|
|
||||||
if aggregation_data.root.is_none()
|
|
||||||
|| *fork >= aggregation_data.root.unwrap()
|
|
||||||
{
|
|
||||||
w_fork_confidence_cache.cache_fork_confidence(
|
|
||||||
*fork,
|
|
||||||
stake_lockout.stake(),
|
|
||||||
aggregation_data.total_staked,
|
|
||||||
stake_lockout.lockout(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cache the stake weighted lockouts
|
|
||||||
for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() {
|
|
||||||
if aggregation_data.root.is_none()
|
|
||||||
|| *fork >= aggregation_data.root.unwrap()
|
|
||||||
{
|
|
||||||
w_fork_confidence_cache
|
|
||||||
.cache_stake_weighted_lockouts(*fork, *stake_weighted_lockout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(root) = aggregation_data.root {
|
|
||||||
w_fork_confidence_cache
|
|
||||||
.prune_confidence_cache(&aggregation_data.ancestors, root);
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(w_fork_confidence_cache);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -1064,7 +998,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default()));
|
let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default()));
|
||||||
let (lockouts_sender, _) = aggregate_stake_lockouts(
|
let (lockouts_sender, _) = AggregateConfidenceService::new(
|
||||||
&Arc::new(AtomicBool::new(false)),
|
&Arc::new(AtomicBool::new(false)),
|
||||||
fork_confidence_cache.clone(),
|
fork_confidence_cache.clone(),
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue