Offload remaining confidence cache computation to separate thread (#5792)

* Move remaining confidence cache computation to separate thread

* Move confidence cache out of bank forks
This commit is contained in:
carllin 2019-09-04 23:10:25 -07:00 committed by GitHub
parent f78b865cba
commit bd74e63702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 243 additions and 165 deletions

View File

@ -32,39 +32,6 @@ pub struct BankForks {
root: u64, root: u64,
snapshot_config: Option<SnapshotConfig>, snapshot_config: Option<SnapshotConfig>,
slots_since_snapshot: Vec<u64>, slots_since_snapshot: Vec<u64>,
confidence: HashMap<u64, Confidence>,
}
#[derive(Debug, Default, PartialEq)]
pub struct Confidence {
fork_stakes: u64,
epoch_stakes: u64,
lockouts: u64,
stake_weighted_lockouts: u128,
}
impl Confidence {
pub fn new(fork_stakes: u64, epoch_stakes: u64, lockouts: u64) -> Self {
Self {
fork_stakes,
epoch_stakes,
lockouts,
stake_weighted_lockouts: 0,
}
}
pub fn new_with_stake_weighted(
fork_stakes: u64,
epoch_stakes: u64,
lockouts: u64,
stake_weighted_lockouts: u128,
) -> Self {
Self {
fork_stakes,
epoch_stakes,
lockouts,
stake_weighted_lockouts,
}
}
} }
impl Index<u64> for BankForks { impl Index<u64> for BankForks {
@ -85,7 +52,6 @@ impl BankForks {
root: 0, root: 0,
snapshot_config: None, snapshot_config: None,
slots_since_snapshot: vec![bank_slot], slots_since_snapshot: vec![bank_slot],
confidence: HashMap::new(),
} }
} }
@ -161,7 +127,6 @@ impl BankForks {
working_bank, working_bank,
snapshot_config: None, snapshot_config: None,
slots_since_snapshot: rooted_path, slots_since_snapshot: rooted_path,
confidence: HashMap::new(),
} }
} }
@ -309,43 +274,6 @@ impl BankForks {
let descendants = self.descendants(); let descendants = self.descendants();
self.banks self.banks
.retain(|slot, _| slot == &root || descendants[&root].contains(slot)); .retain(|slot, _| slot == &root || descendants[&root].contains(slot));
self.confidence
.retain(|slot, _| slot == &root || descendants[&root].contains(slot));
}
pub fn cache_fork_confidence(
&mut self,
fork: u64,
fork_stakes: u64,
epoch_stakes: u64,
lockouts: u64,
) {
self.confidence
.entry(fork)
.and_modify(|entry| {
entry.fork_stakes = fork_stakes;
entry.epoch_stakes = epoch_stakes;
entry.lockouts = lockouts;
})
.or_insert_with(|| Confidence::new(fork_stakes, epoch_stakes, lockouts));
}
pub fn cache_stake_weighted_lockouts(&mut self, fork: u64, stake_weighted_lockouts: u128) {
self.confidence
.entry(fork)
.and_modify(|entry| {
entry.stake_weighted_lockouts = stake_weighted_lockouts;
})
.or_insert(Confidence {
fork_stakes: 0,
epoch_stakes: 0,
lockouts: 0,
stake_weighted_lockouts,
});
}
pub fn get_fork_confidence(&self, fork: u64) -> Option<&Confidence> {
self.confidence.get(&fork)
} }
pub fn set_snapshot_config(&mut self, snapshot_config: SnapshotConfig) { pub fn set_snapshot_config(&mut self, snapshot_config: SnapshotConfig) {
@ -442,46 +370,6 @@ mod tests {
assert_eq!(bank_forks.active_banks(), vec![1]); assert_eq!(bank_forks.active_banks(), vec![1]);
} }
#[test]
fn test_bank_forks_confidence_cache() {
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
let bank = Bank::new(&genesis_block);
let fork = bank.slot();
let mut bank_forks = BankForks::new(0, bank);
assert!(bank_forks.confidence.get(&fork).is_none());
bank_forks.cache_fork_confidence(fork, 11, 12, 13);
assert_eq!(
bank_forks.confidence.get(&fork).unwrap(),
&Confidence {
fork_stakes: 11,
epoch_stakes: 12,
lockouts: 13,
stake_weighted_lockouts: 0,
}
);
// Ensure that {fork_stakes, epoch_stakes, lockouts} and stake_weighted_lockouts
// can be updated separately
bank_forks.cache_stake_weighted_lockouts(fork, 20);
assert_eq!(
bank_forks.confidence.get(&fork).unwrap(),
&Confidence {
fork_stakes: 11,
epoch_stakes: 12,
lockouts: 13,
stake_weighted_lockouts: 20,
}
);
bank_forks.cache_fork_confidence(fork, 21, 22, 23);
assert_eq!(
bank_forks
.confidence
.get(&fork)
.unwrap()
.stake_weighted_lockouts,
20,
);
}
fn restore_from_snapshot(old_bank_forks: &BankForks, account_paths: String) { fn restore_from_snapshot(old_bank_forks: &BankForks, account_paths: String) {
let (snapshot_path, snapshot_package_output_path) = old_bank_forks let (snapshot_path, snapshot_package_output_path) = old_bank_forks
.snapshot_config .snapshot_config

119
core/src/confidence.rs Normal file
View File

@ -0,0 +1,119 @@
use std::collections::{HashMap, HashSet};
#[derive(Debug, Default, PartialEq)]
pub struct Confidence {
fork_stakes: u64,
total_stake: u64,
lockouts: u64,
stake_weighted_lockouts: u128,
}
impl Confidence {
pub fn new(fork_stakes: u64, total_stake: u64, lockouts: u64) -> Self {
Self {
fork_stakes,
total_stake,
lockouts,
stake_weighted_lockouts: 0,
}
}
pub fn new_with_stake_weighted(
fork_stakes: u64,
total_stake: u64,
lockouts: u64,
stake_weighted_lockouts: u128,
) -> Self {
Self {
fork_stakes,
total_stake,
lockouts,
stake_weighted_lockouts,
}
}
}
#[derive(Default, PartialEq)]
pub struct ForkConfidenceCache {
confidence: HashMap<u64, Confidence>,
}
impl ForkConfidenceCache {
pub fn cache_fork_confidence(
&mut self,
fork: u64,
fork_stakes: u64,
total_stake: u64,
lockouts: u64,
) {
self.confidence
.entry(fork)
.and_modify(|entry| {
entry.fork_stakes = fork_stakes;
entry.total_stake = total_stake;
entry.lockouts = lockouts;
})
.or_insert_with(|| Confidence::new(fork_stakes, total_stake, lockouts));
}
pub fn cache_stake_weighted_lockouts(&mut self, fork: u64, stake_weighted_lockouts: u128) {
self.confidence
.entry(fork)
.and_modify(|entry| {
entry.stake_weighted_lockouts = stake_weighted_lockouts;
})
.or_insert(Confidence {
fork_stakes: 0,
total_stake: 0,
lockouts: 0,
stake_weighted_lockouts,
});
}
pub fn get_fork_confidence(&self, fork: u64) -> Option<&Confidence> {
self.confidence.get(&fork)
}
pub fn prune_confidence_cache(&mut self, ancestors: &HashMap<u64, HashSet<u64>>, root: u64) {
self.confidence
.retain(|slot, _| slot == &root || ancestors[&slot].contains(&root));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fork_confidence_cache() {
let mut cache = ForkConfidenceCache::default();
let fork = 0;
assert!(cache.confidence.get(&fork).is_none());
cache.cache_fork_confidence(fork, 11, 12, 13);
assert_eq!(
cache.confidence.get(&fork).unwrap(),
&Confidence {
fork_stakes: 11,
total_stake: 12,
lockouts: 13,
stake_weighted_lockouts: 0,
}
);
// Ensure that {fork_stakes, total_stake, lockouts} and stake_weighted_lockouts
// can be updated separately
cache.cache_stake_weighted_lockouts(fork, 20);
assert_eq!(
cache.confidence.get(&fork).unwrap(),
&Confidence {
fork_stakes: 11,
total_stake: 12,
lockouts: 13,
stake_weighted_lockouts: 20,
}
);
cache.cache_fork_confidence(fork, 21, 22, 23);
assert_eq!(
cache.confidence.get(&fork).unwrap().stake_weighted_lockouts,
20,
);
}
}

View File

@ -304,7 +304,7 @@ impl Tower {
pub fn aggregate_stake_lockouts( pub fn aggregate_stake_lockouts(
root: Option<u64>, root: Option<u64>,
ancestors: &HashMap<u64, HashSet<u64>>, ancestors: &HashMap<u64, HashSet<u64>>,
stake_lockouts: HashMap<u64, StakeLockout>, stake_lockouts: &HashMap<u64, StakeLockout>,
) -> HashMap<u64, u128> { ) -> HashMap<u64, u128> {
let mut stake_weighted_lockouts: HashMap<u64, u128> = HashMap::new(); let mut stake_weighted_lockouts: HashMap<u64, u128> = HashMap::new();
for (fork, lockout) in stake_lockouts.iter() { for (fork, lockout) in stake_lockouts.iter() {
@ -562,7 +562,7 @@ mod test {
.into_iter() .into_iter()
.collect(); .collect();
let stake_weighted_lockouts = let stake_weighted_lockouts =
Tower::aggregate_stake_lockouts(tower.root(), &ancestors, stakes); Tower::aggregate_stake_lockouts(tower.root(), &ancestors, &stakes);
assert!(stake_weighted_lockouts.get(&0).is_none()); assert!(stake_weighted_lockouts.get(&0).is_none());
assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24); assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24);
assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16); assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16);

View File

@ -13,6 +13,7 @@ pub mod chacha;
#[cfg(cuda)] #[cfg(cuda)]
pub mod chacha_cuda; pub mod chacha_cuda;
pub mod cluster_info_vote_listener; pub mod cluster_info_vote_listener;
pub mod confidence;
pub mod recycler; pub mod recycler;
#[macro_use] #[macro_use]
pub mod contact_info; pub mod contact_info;

View File

@ -4,6 +4,7 @@ use crate::bank_forks::BankForks;
use crate::blocktree::{Blocktree, BlocktreeError}; use crate::blocktree::{Blocktree, BlocktreeError};
use crate::blocktree_processor; use crate::blocktree_processor;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::confidence::ForkConfidenceCache;
use crate::consensus::{StakeLockout, Tower}; use crate::consensus::{StakeLockout, Tower};
use crate::entry::{Entry, EntrySlice}; use crate::entry::{Entry, EntrySlice};
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
@ -22,6 +23,7 @@ use solana_sdk::timing::{self, duration_as_ms};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction; use solana_vote_api::vote_instruction;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@ -93,6 +95,7 @@ impl ReplayStage {
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
slot_full_senders: Vec<Sender<(u64, Pubkey)>>, slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
snapshot_package_sender: Option<SnapshotPackageSender>, snapshot_package_sender: Option<SnapshotPackageSender>,
fork_confidence_cache: Arc<RwLock<ForkConfidenceCache>>,
) -> (Self, Receiver<Vec<Arc<Bank>>>) ) -> (Self, Receiver<Vec<Arc<Bank>>>)
where where
T: 'static + KeypairUtil + Send + Sync, T: 'static + KeypairUtil + Send + Sync,
@ -110,7 +113,7 @@ impl ReplayStage {
let vote_account = *vote_account; let vote_account = *vote_account;
let voting_keypair = voting_keypair.cloned(); let voting_keypair = voting_keypair.cloned();
let (lockouts_sender, t_lockouts) = aggregate_stake_lockouts(exit); let (lockouts_sender, t_lockouts) = aggregate_stake_lockouts(exit, fork_confidence_cache);
let t_replay = Builder::new() let t_replay = Builder::new()
.name("solana-replay-stage".to_string()) .name("solana-replay-stage".to_string())
@ -142,7 +145,13 @@ impl ReplayStage {
&slot_full_senders, &slot_full_senders,
); );
let votable = Self::generate_votable_banks(&bank_forks, &tower, &mut progress); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
let votable = Self::generate_votable_banks(
&ancestors,
&bank_forks,
&tower,
&mut progress,
);
if let Some((_, bank, lockouts, total_staked)) = votable.into_iter().last() { if let Some((_, bank, lockouts, total_staked)) = votable.into_iter().last() {
subscriptions.notify_subscribers(bank.slot(), &bank_forks); subscriptions.notify_subscribers(bank.slot(), &bank_forks);
@ -172,6 +181,7 @@ impl ReplayStage {
Self::handle_votable_bank( Self::handle_votable_bank(
&bank, &bank,
&bank_forks, &bank_forks,
&ancestors,
&mut tower, &mut tower,
&mut progress, &mut progress,
&vote_account, &vote_account,
@ -392,6 +402,7 @@ impl ReplayStage {
fn handle_votable_bank<T>( fn handle_votable_bank<T>(
bank: &Arc<Bank>, bank: &Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
ancestors: &Arc<HashMap<u64, HashSet<u64>>>,
tower: &mut Tower, tower: &mut Tower,
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
vote_account: &Pubkey, vote_account: &Pubkey,
@ -440,7 +451,7 @@ impl ReplayStage {
Err(e)?; Err(e)?;
} }
} }
Self::update_confidence_cache(bank_forks, tower, lockouts, total_staked, lockouts_sender); Self::update_confidence_cache(ancestors, tower, lockouts, total_staked, lockouts_sender);
if let Some(ref voting_keypair) = voting_keypair { if let Some(ref voting_keypair) = voting_keypair {
let node_keypair = cluster_info.read().unwrap().keypair.clone(); let node_keypair = cluster_info.read().unwrap().keypair.clone();
@ -461,30 +472,18 @@ impl ReplayStage {
} }
fn update_confidence_cache( fn update_confidence_cache(
bank_forks: &Arc<RwLock<BankForks>>, ancestors: &Arc<HashMap<u64, HashSet<u64>>>,
tower: &Tower, tower: &Tower,
lockouts: HashMap<u64, StakeLockout>, lockouts: HashMap<u64, StakeLockout>,
total_staked: u64, total_staked: u64,
lockouts_sender: &Sender<LockoutAggregationData>, lockouts_sender: &Sender<LockoutAggregationData>,
) { ) {
{ if let Err(e) = lockouts_sender.send(LockoutAggregationData {
let mut bank_forks = bank_forks.write().unwrap(); lockouts,
for (fork, stake_lockout) in lockouts.iter() { root: tower.root(),
if tower.root().is_none() || *fork >= tower.root().unwrap() { ancestors: ancestors.clone(),
bank_forks.cache_fork_confidence( total_staked,
*fork, }) {
stake_lockout.stake(),
total_staked,
stake_lockout.lockout(),
);
}
}
}
let bank_forks_clone = bank_forks.clone();
let root = tower.root();
if let Err(e) = lockouts_sender.send((lockouts, root, bank_forks_clone)) {
trace!("lockouts_sender failed: {:?}", e); trace!("lockouts_sender failed: {:?}", e);
} }
} }
@ -567,15 +566,13 @@ impl ReplayStage {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn generate_votable_banks( fn generate_votable_banks(
ancestors: &HashMap<u64, HashSet<u64>>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
tower: &Tower, tower: &Tower,
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
) -> Vec<(u128, Arc<Bank>, HashMap<u64, StakeLockout>, u64)> { ) -> Vec<(u128, Arc<Bank>, HashMap<u64, StakeLockout>, u64)> {
let tower_start = Instant::now(); let tower_start = Instant::now();
// Tower voting
let ancestors = bank_forks.read().unwrap().ancestors();
let frozen_banks = bank_forks.read().unwrap().frozen_banks(); let frozen_banks = bank_forks.read().unwrap().frozen_banks();
trace!("frozen_banks {}", frozen_banks.len()); trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>, HashMap<u64, StakeLockout>, u64)> = frozen_banks let mut votable: Vec<(u128, Arc<Bank>, HashMap<u64, StakeLockout>, u64)> = frozen_banks
.values() .values()
@ -796,14 +793,16 @@ impl Service for ReplayStage {
} }
} }
type LockoutAggregationData = ( struct LockoutAggregationData {
HashMap<u64, StakeLockout>, // lockouts lockouts: HashMap<u64, StakeLockout>,
Option<u64>, // root root: Option<u64>,
Arc<RwLock<BankForks>>, // bank_forks ancestors: Arc<HashMap<u64, HashSet<u64>>>,
); total_staked: u64,
}
fn aggregate_stake_lockouts( fn aggregate_stake_lockouts(
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
fork_confidence_cache: Arc<RwLock<ForkConfidenceCache>>,
) -> (Sender<LockoutAggregationData>, JoinHandle<()>) { ) -> (Sender<LockoutAggregationData>, JoinHandle<()>) {
let (lockouts_sender, lockouts_receiver): ( let (lockouts_sender, lockouts_receiver): (
Sender<LockoutAggregationData>, Sender<LockoutAggregationData>,
@ -818,18 +817,45 @@ fn aggregate_stake_lockouts(
if exit_.load(Ordering::Relaxed) { if exit_.load(Ordering::Relaxed) {
break; break;
} }
if let Ok((lockouts, root, bank_forks)) = lockouts_receiver.try_recv() { if let Ok(aggregation_data) = lockouts_receiver.try_recv() {
let ancestors = bank_forks.read().unwrap().ancestors(); let stake_weighted_lockouts = Tower::aggregate_stake_lockouts(
let stake_weighted_lockouts = aggregation_data.root,
Tower::aggregate_stake_lockouts(root, &ancestors, lockouts); &aggregation_data.ancestors,
let mut w_bank_forks = bank_forks.write().unwrap(); &aggregation_data.lockouts,
);
let mut w_fork_confidence_cache = fork_confidence_cache.write().unwrap();
// Cache the confidence values
for (fork, stake_lockout) in aggregation_data.lockouts.iter() {
if aggregation_data.root.is_none()
|| *fork >= aggregation_data.root.unwrap()
{
w_fork_confidence_cache.cache_fork_confidence(
*fork,
stake_lockout.stake(),
aggregation_data.total_staked,
stake_lockout.lockout(),
);
}
}
// Cache the stake weighted lockouts
for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() { for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() {
if root.is_none() || *fork >= root.unwrap() { if aggregation_data.root.is_none()
w_bank_forks || *fork >= aggregation_data.root.unwrap()
{
w_fork_confidence_cache
.cache_stake_weighted_lockouts(*fork, *stake_weighted_lockout) .cache_stake_weighted_lockouts(*fork, *stake_weighted_lockout)
} }
} }
drop(w_bank_forks);
if let Some(root) = aggregation_data.root {
w_fork_confidence_cache
.prune_confidence_cache(&aggregation_data.ancestors, root);
}
drop(w_fork_confidence_cache);
} }
}) })
.unwrap(), .unwrap(),
@ -839,9 +865,9 @@ fn aggregate_stake_lockouts(
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use crate::bank_forks::Confidence;
use crate::blocktree::tests::make_slot_entries; use crate::blocktree::tests::make_slot_entries;
use crate::blocktree::{entries_to_test_shreds, get_tmp_ledger_path}; use crate::blocktree::{entries_to_test_shreds, get_tmp_ledger_path};
use crate::confidence::Confidence;
use crate::entry; use crate::entry;
use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader}; use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader};
use crate::replay_stage::ReplayStage; use crate::replay_stage::ReplayStage;
@ -1036,7 +1062,11 @@ mod test {
bank.store_account(&pubkey, &leader_vote_account); bank.store_account(&pubkey, &leader_vote_account);
} }
let (lockouts_sender, _) = aggregate_stake_lockouts(&Arc::new(AtomicBool::new(false))); let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default()));
let (lockouts_sender, _) = aggregate_stake_lockouts(
&Arc::new(AtomicBool::new(false)),
fork_confidence_cache.clone(),
);
let leader_pubkey = Pubkey::new_rand(); let leader_pubkey = Pubkey::new_rand();
let leader_lamports = 3; let leader_lamports = 3;
@ -1061,10 +1091,13 @@ mod test {
let mut progress = HashMap::new(); let mut progress = HashMap::new();
leader_vote(&arc_bank0, &leader_voting_pubkey); leader_vote(&arc_bank0, &leader_voting_pubkey);
let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
let votable =
ReplayStage::generate_votable_banks(&ancestors, &bank_forks, &tower, &mut progress);
if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() { if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() {
ReplayStage::update_confidence_cache( ReplayStage::update_confidence_cache(
&bank_forks, &ancestors,
&tower, &tower,
lockouts, lockouts,
total_staked, total_staked,
@ -1072,11 +1105,21 @@ mod test {
); );
} }
thread::sleep(Duration::from_millis(200));
assert_eq!( assert_eq!(
bank_forks.read().unwrap().get_fork_confidence(0).unwrap(), fork_confidence_cache
.read()
.unwrap()
.get_fork_confidence(0)
.unwrap(),
&Confidence::new(0, 3, 2) &Confidence::new(0, 3, 2)
); );
assert!(bank_forks.read().unwrap().get_fork_confidence(1).is_none()); assert!(fork_confidence_cache
.read()
.unwrap()
.get_fork_confidence(1)
.is_none());
tower.record_vote(arc_bank0.slot(), arc_bank0.hash()); tower.record_vote(arc_bank0.slot(), arc_bank0.hash());
@ -1089,10 +1132,12 @@ mod test {
bank_forks.write().unwrap().insert(bank1); bank_forks.write().unwrap().insert(bank1);
let arc_bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); let arc_bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
leader_vote(&arc_bank1, &leader_voting_pubkey); leader_vote(&arc_bank1, &leader_voting_pubkey);
let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
let votable =
ReplayStage::generate_votable_banks(&ancestors, &bank_forks, &tower, &mut progress);
if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() { if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() {
ReplayStage::update_confidence_cache( ReplayStage::update_confidence_cache(
&bank_forks, &ancestors,
&tower, &tower,
lockouts, lockouts,
total_staked, total_staked,
@ -1111,10 +1156,12 @@ mod test {
bank_forks.write().unwrap().insert(bank2); bank_forks.write().unwrap().insert(bank2);
let arc_bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); let arc_bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
leader_vote(&arc_bank2, &leader_voting_pubkey); leader_vote(&arc_bank2, &leader_voting_pubkey);
let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
let votable =
ReplayStage::generate_votable_banks(&ancestors, &bank_forks, &tower, &mut progress);
if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() { if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() {
ReplayStage::update_confidence_cache( ReplayStage::update_confidence_cache(
&bank_forks, &ancestors,
&tower, &tower,
lockouts, lockouts,
total_staked, total_staked,
@ -1124,15 +1171,27 @@ mod test {
thread::sleep(Duration::from_millis(200)); thread::sleep(Duration::from_millis(200));
assert_eq!( assert_eq!(
bank_forks.read().unwrap().get_fork_confidence(0).unwrap(), fork_confidence_cache
.read()
.unwrap()
.get_fork_confidence(0)
.unwrap(),
&Confidence::new_with_stake_weighted(3, 3, 14, 60) &Confidence::new_with_stake_weighted(3, 3, 14, 60)
); );
assert_eq!( assert_eq!(
bank_forks.read().unwrap().get_fork_confidence(1).unwrap(), fork_confidence_cache
.read()
.unwrap()
.get_fork_confidence(1)
.unwrap(),
&Confidence::new_with_stake_weighted(3, 3, 6, 18) &Confidence::new_with_stake_weighted(3, 3, 6, 18)
); );
assert_eq!( assert_eq!(
bank_forks.read().unwrap().get_fork_confidence(2).unwrap(), fork_confidence_cache
.read()
.unwrap()
.get_fork_confidence(2)
.unwrap(),
&Confidence::new_with_stake_weighted(0, 3, 2, 0) &Confidence::new_with_stake_weighted(0, 3, 2, 0)
); );
} }

View File

@ -17,6 +17,7 @@ use crate::blob_fetch_stage::BlobFetchStage;
use crate::blockstream_service::BlockstreamService; use crate::blockstream_service::BlockstreamService;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::confidence::ForkConfidenceCache;
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::ledger_cleanup_service::LedgerCleanupService; use crate::ledger_cleanup_service::LedgerCleanupService;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
@ -77,6 +78,7 @@ impl Tvu {
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver, completed_slots_receiver: CompletedSlotsReceiver,
fork_confidence_cache: Arc<RwLock<ForkConfidenceCache>>,
) -> Self ) -> Self
where where
T: 'static + KeypairUtil + Sync + Send, T: 'static + KeypairUtil + Sync + Send,
@ -153,6 +155,7 @@ impl Tvu {
leader_schedule_cache, leader_schedule_cache,
vec![blockstream_slot_sender, ledger_cleanup_slot_sender], vec![blockstream_slot_sender, ledger_cleanup_slot_sender],
snapshot_package_sender, snapshot_package_sender,
fork_confidence_cache,
); );
let blockstream_service = if blockstream_unix_socket.is_some() { let blockstream_service = if blockstream_unix_socket.is_some() {
@ -258,6 +261,7 @@ pub mod tests {
let voting_keypair = Keypair::new(); let voting_keypair = Keypair::new();
let storage_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new());
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default()));
let tvu = Tvu::new( let tvu = Tvu::new(
&voting_keypair.pubkey(), &voting_keypair.pubkey(),
Some(&Arc::new(voting_keypair)), Some(&Arc::new(voting_keypair)),
@ -282,6 +286,7 @@ pub mod tests {
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
completed_slots_receiver, completed_slots_receiver,
fork_confidence_cache,
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
tvu.join().unwrap(); tvu.join().unwrap();

View File

@ -5,6 +5,7 @@ use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::blocktree_processor::{self, BankForksInfo}; use crate::blocktree_processor::{self, BankForksInfo};
use crate::broadcast_stage::BroadcastStageType; use crate::broadcast_stage::BroadcastStageType;
use crate::cluster_info::{ClusterInfo, Node}; use crate::cluster_info::{ClusterInfo, Node};
use crate::confidence::ForkConfidenceCache;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::erasure::ErasureConfig; use crate::erasure::ErasureConfig;
use crate::gossip_service::{discover_cluster, GossipService}; use crate::gossip_service::{discover_cluster, GossipService};
@ -301,6 +302,7 @@ impl Validator {
Some(voting_keypair) Some(voting_keypair)
}; };
let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default()));
let tvu = Tvu::new( let tvu = Tvu::new(
vote_account, vote_account,
voting_keypair, voting_keypair,
@ -318,6 +320,7 @@ impl Validator {
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
completed_slots_receiver, completed_slots_receiver,
fork_confidence_cache,
); );
if config.dev_sigverify_disabled { if config.dev_sigverify_disabled {

View File

@ -5,6 +5,7 @@ use log::*;
use solana_core::banking_stage::create_test_recorder; use solana_core::banking_stage::create_test_recorder;
use solana_core::blocktree::{create_new_tmp_ledger, Blocktree}; use solana_core::blocktree::{create_new_tmp_ledger, Blocktree};
use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::confidence::ForkConfidenceCache;
use solana_core::entry::next_entry_mut; use solana_core::entry::next_entry_mut;
use solana_core::entry::EntrySlice; use solana_core::entry::EntrySlice;
use solana_core::genesis_utils::{create_genesis_block_with_leader, GenesisBlockInfo}; use solana_core::genesis_utils::{create_genesis_block_with_leader, GenesisBlockInfo};
@ -120,6 +121,7 @@ fn test_replay() {
{ {
let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&working_bank, &blocktree); create_test_recorder(&working_bank, &blocktree);
let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default()));
let tvu = Tvu::new( let tvu = Tvu::new(
&voting_keypair.pubkey(), &voting_keypair.pubkey(),
Some(&Arc::new(voting_keypair)), Some(&Arc::new(voting_keypair)),
@ -144,6 +146,7 @@ fn test_replay() {
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
completed_slots_receiver, completed_slots_receiver,
fork_confidence_cache,
); );
let mut mint_ref_balance = mint_balance; let mut mint_ref_balance = mint_balance;