Check ClusterSlots for confirmation of block propagation (#9115)
This commit is contained in:
parent
24d887a38a
commit
66946a4680
|
@ -482,6 +482,7 @@ pub mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
cluster_info_vote_listener::VoteTracker,
|
cluster_info_vote_listener::VoteTracker,
|
||||||
|
cluster_slots::ClusterSlots,
|
||||||
progress_map::ForkProgress,
|
progress_map::ForkProgress,
|
||||||
replay_stage::{HeaviestForkFailures, ReplayStage},
|
replay_stage::{HeaviestForkFailures, ReplayStage},
|
||||||
};
|
};
|
||||||
|
@ -612,6 +613,7 @@ pub mod test {
|
||||||
tower,
|
tower,
|
||||||
progress,
|
progress,
|
||||||
&VoteTracker::default(),
|
&VoteTracker::default(),
|
||||||
|
&ClusterSlots::default(),
|
||||||
bank_forks,
|
bank_forks,
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
);
|
);
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
cluster_info_vote_listener::SlotVoteTracker, consensus::StakeLockout,
|
cluster_info_vote_listener::SlotVoteTracker, cluster_slots::SlotPubkeys,
|
||||||
replay_stage::SUPERMINORITY_THRESHOLD,
|
consensus::StakeLockout, replay_stage::SUPERMINORITY_THRESHOLD,
|
||||||
};
|
};
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
bank_forks::BankForks,
|
bank_forks::BankForks,
|
||||||
blockstore_processor::{ConfirmationProgress, ConfirmationTiming},
|
blockstore_processor::{ConfirmationProgress, ConfirmationTiming},
|
||||||
};
|
};
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
|
use solana_sdk::{account::Account, clock::Slot, hash::Hash, pubkey::Pubkey};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
rc::Rc,
|
rc::Rc,
|
||||||
|
@ -179,14 +179,84 @@ pub(crate) struct ForkStats {
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
pub(crate) struct PropagatedStats {
|
pub(crate) struct PropagatedStats {
|
||||||
pub(crate) propagated_validators: HashSet<Rc<Pubkey>>,
|
pub(crate) propagated_validators: HashSet<Rc<Pubkey>>,
|
||||||
|
pub(crate) propagated_node_ids: HashSet<Rc<Pubkey>>,
|
||||||
pub(crate) propagated_validators_stake: u64,
|
pub(crate) propagated_validators_stake: u64,
|
||||||
pub(crate) is_propagated: bool,
|
pub(crate) is_propagated: bool,
|
||||||
pub(crate) is_leader_slot: bool,
|
pub(crate) is_leader_slot: bool,
|
||||||
pub(crate) prev_leader_slot: Option<Slot>,
|
pub(crate) prev_leader_slot: Option<Slot>,
|
||||||
pub(crate) slot_vote_tracker: Option<Arc<RwLock<SlotVoteTracker>>>,
|
pub(crate) slot_vote_tracker: Option<Arc<RwLock<SlotVoteTracker>>>,
|
||||||
|
pub(crate) cluster_slot_pubkeys: Option<Arc<RwLock<SlotPubkeys>>>,
|
||||||
pub(crate) total_epoch_stake: u64,
|
pub(crate) total_epoch_stake: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PropagatedStats {
|
||||||
|
pub fn add_vote_pubkey(
|
||||||
|
&mut self,
|
||||||
|
vote_pubkey: &Pubkey,
|
||||||
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
|
stake: u64,
|
||||||
|
) {
|
||||||
|
if !self.propagated_validators.contains(vote_pubkey) {
|
||||||
|
let mut cached_pubkey: Option<Rc<Pubkey>> = all_pubkeys.get(vote_pubkey).cloned();
|
||||||
|
if cached_pubkey.is_none() {
|
||||||
|
let new_pubkey = Rc::new(*vote_pubkey);
|
||||||
|
all_pubkeys.insert(new_pubkey.clone());
|
||||||
|
cached_pubkey = Some(new_pubkey);
|
||||||
|
}
|
||||||
|
let vote_pubkey = cached_pubkey.unwrap();
|
||||||
|
self.propagated_validators.insert(vote_pubkey);
|
||||||
|
self.propagated_validators_stake += stake;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_node_pubkey(
|
||||||
|
&mut self,
|
||||||
|
node_pubkey: &Pubkey,
|
||||||
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
|
bank: &Bank,
|
||||||
|
) {
|
||||||
|
if !self.propagated_node_ids.contains(node_pubkey) {
|
||||||
|
let node_vote_accounts = bank
|
||||||
|
.epoch_vote_accounts_for_node_id(&node_pubkey)
|
||||||
|
.map(|v| &v.vote_accounts);
|
||||||
|
|
||||||
|
if let Some(node_vote_accounts) = node_vote_accounts {
|
||||||
|
self.add_node_pubkey_internal(
|
||||||
|
node_pubkey,
|
||||||
|
all_pubkeys,
|
||||||
|
node_vote_accounts,
|
||||||
|
bank.epoch_vote_accounts(bank.epoch())
|
||||||
|
.expect("Epoch stakes for bank's own epoch must exist"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_node_pubkey_internal(
|
||||||
|
&mut self,
|
||||||
|
node_pubkey: &Pubkey,
|
||||||
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
|
vote_account_pubkeys: &[Pubkey],
|
||||||
|
epoch_vote_accounts: &HashMap<Pubkey, (u64, Account)>,
|
||||||
|
) {
|
||||||
|
let mut cached_pubkey: Option<Rc<Pubkey>> = all_pubkeys.get(node_pubkey).cloned();
|
||||||
|
if cached_pubkey.is_none() {
|
||||||
|
let new_pubkey = Rc::new(*node_pubkey);
|
||||||
|
all_pubkeys.insert(new_pubkey.clone());
|
||||||
|
cached_pubkey = Some(new_pubkey);
|
||||||
|
}
|
||||||
|
let node_pubkey = cached_pubkey.unwrap();
|
||||||
|
self.propagated_node_ids.insert(node_pubkey);
|
||||||
|
for vote_account_pubkey in vote_account_pubkeys.iter() {
|
||||||
|
let stake = epoch_vote_accounts
|
||||||
|
.get(vote_account_pubkey)
|
||||||
|
.map(|(stake, _)| *stake)
|
||||||
|
.unwrap_or(0);
|
||||||
|
self.add_vote_pubkey(vote_account_pubkey, all_pubkeys, stake);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub(crate) struct ProgressMap {
|
pub(crate) struct ProgressMap {
|
||||||
progress_map: HashMap<Slot, ForkProgress>,
|
progress_map: HashMap<Slot, ForkProgress>,
|
||||||
|
@ -288,6 +358,120 @@ impl ProgressMap {
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_vote_pubkey() {
|
||||||
|
let mut stats = PropagatedStats::default();
|
||||||
|
let mut all_pubkeys = HashSet::new();
|
||||||
|
let mut vote_pubkey = Pubkey::new_rand();
|
||||||
|
all_pubkeys.insert(Rc::new(vote_pubkey.clone()));
|
||||||
|
|
||||||
|
// Add a vote pubkey, the number of references in all_pubkeys
|
||||||
|
// should be 2
|
||||||
|
stats.add_vote_pubkey(&vote_pubkey, &mut all_pubkeys, 1);
|
||||||
|
assert!(stats.propagated_validators.contains(&vote_pubkey));
|
||||||
|
assert_eq!(stats.propagated_validators_stake, 1);
|
||||||
|
assert_eq!(Rc::strong_count(all_pubkeys.get(&vote_pubkey).unwrap()), 2);
|
||||||
|
|
||||||
|
// Adding it again should change no state since the key already existed
|
||||||
|
stats.add_vote_pubkey(&vote_pubkey, &mut all_pubkeys, 1);
|
||||||
|
assert!(stats.propagated_validators.contains(&vote_pubkey));
|
||||||
|
assert_eq!(stats.propagated_validators_stake, 1);
|
||||||
|
|
||||||
|
// Addding another pubkey should succeed
|
||||||
|
vote_pubkey = Pubkey::new_rand();
|
||||||
|
stats.add_vote_pubkey(&vote_pubkey, &mut all_pubkeys, 2);
|
||||||
|
assert!(stats.propagated_validators.contains(&vote_pubkey));
|
||||||
|
assert_eq!(stats.propagated_validators_stake, 3);
|
||||||
|
assert_eq!(Rc::strong_count(all_pubkeys.get(&vote_pubkey).unwrap()), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_node_pubkey_internal() {
|
||||||
|
let num_vote_accounts = 10;
|
||||||
|
let staked_vote_accounts = 5;
|
||||||
|
let vote_account_pubkeys: Vec<_> = std::iter::repeat_with(|| Pubkey::new_rand())
|
||||||
|
.take(num_vote_accounts)
|
||||||
|
.collect();
|
||||||
|
let epoch_vote_accounts: HashMap<_, _> = vote_account_pubkeys
|
||||||
|
.iter()
|
||||||
|
.skip(num_vote_accounts - staked_vote_accounts)
|
||||||
|
.map(|pubkey| (*pubkey, (1, Account::default())))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut stats = PropagatedStats::default();
|
||||||
|
let mut all_pubkeys = HashSet::new();
|
||||||
|
let mut node_pubkey = Pubkey::new_rand();
|
||||||
|
all_pubkeys.insert(Rc::new(node_pubkey.clone()));
|
||||||
|
|
||||||
|
// Add a vote pubkey, the number of references in all_pubkeys
|
||||||
|
// should be 2
|
||||||
|
stats.add_node_pubkey_internal(
|
||||||
|
&node_pubkey,
|
||||||
|
&mut all_pubkeys,
|
||||||
|
&vote_account_pubkeys,
|
||||||
|
&epoch_vote_accounts,
|
||||||
|
);
|
||||||
|
assert!(stats.propagated_node_ids.contains(&node_pubkey));
|
||||||
|
assert_eq!(
|
||||||
|
stats.propagated_validators_stake,
|
||||||
|
staked_vote_accounts as u64
|
||||||
|
);
|
||||||
|
assert_eq!(Rc::strong_count(all_pubkeys.get(&node_pubkey).unwrap()), 2);
|
||||||
|
|
||||||
|
// Adding it again should not change any state
|
||||||
|
stats.add_node_pubkey_internal(
|
||||||
|
&node_pubkey,
|
||||||
|
&mut all_pubkeys,
|
||||||
|
&vote_account_pubkeys,
|
||||||
|
&epoch_vote_accounts,
|
||||||
|
);
|
||||||
|
assert!(stats.propagated_node_ids.contains(&node_pubkey));
|
||||||
|
assert_eq!(
|
||||||
|
stats.propagated_validators_stake,
|
||||||
|
staked_vote_accounts as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
// Addding another pubkey with same vote accounts should succeed, but stake
|
||||||
|
// shouldn't increase
|
||||||
|
node_pubkey = Pubkey::new_rand();
|
||||||
|
stats.add_node_pubkey_internal(
|
||||||
|
&node_pubkey,
|
||||||
|
&mut all_pubkeys,
|
||||||
|
&vote_account_pubkeys,
|
||||||
|
&epoch_vote_accounts,
|
||||||
|
);
|
||||||
|
assert!(stats.propagated_node_ids.contains(&node_pubkey));
|
||||||
|
assert_eq!(
|
||||||
|
stats.propagated_validators_stake,
|
||||||
|
staked_vote_accounts as u64
|
||||||
|
);
|
||||||
|
assert_eq!(Rc::strong_count(all_pubkeys.get(&node_pubkey).unwrap()), 2);
|
||||||
|
|
||||||
|
// Addding another pubkey with different vote accounts should succeed
|
||||||
|
// and increase stake
|
||||||
|
node_pubkey = Pubkey::new_rand();
|
||||||
|
let vote_account_pubkeys: Vec<_> = std::iter::repeat_with(|| Pubkey::new_rand())
|
||||||
|
.take(num_vote_accounts)
|
||||||
|
.collect();
|
||||||
|
let epoch_vote_accounts: HashMap<_, _> = vote_account_pubkeys
|
||||||
|
.iter()
|
||||||
|
.skip(num_vote_accounts - staked_vote_accounts)
|
||||||
|
.map(|pubkey| (*pubkey, (1, Account::default())))
|
||||||
|
.collect();
|
||||||
|
stats.add_node_pubkey_internal(
|
||||||
|
&node_pubkey,
|
||||||
|
&mut all_pubkeys,
|
||||||
|
&vote_account_pubkeys,
|
||||||
|
&epoch_vote_accounts,
|
||||||
|
);
|
||||||
|
assert!(stats.propagated_node_ids.contains(&node_pubkey));
|
||||||
|
assert_eq!(
|
||||||
|
stats.propagated_validators_stake,
|
||||||
|
2 * staked_vote_accounts as u64
|
||||||
|
);
|
||||||
|
assert_eq!(Rc::strong_count(all_pubkeys.get(&node_pubkey).unwrap()), 2);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_is_propagated_status_on_construction() {
|
fn test_is_propagated_status_on_construction() {
|
||||||
// If the given ValidatorStakeInfo == None, then this is not
|
// If the given ValidatorStakeInfo == None, then this is not
|
||||||
|
|
|
@ -4,6 +4,7 @@ use crate::{
|
||||||
broadcast_stage::RetransmitSlotsSender,
|
broadcast_stage::RetransmitSlotsSender,
|
||||||
cluster_info::ClusterInfo,
|
cluster_info::ClusterInfo,
|
||||||
cluster_info_vote_listener::VoteTracker,
|
cluster_info_vote_listener::VoteTracker,
|
||||||
|
cluster_slots::ClusterSlots,
|
||||||
commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData},
|
commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData},
|
||||||
consensus::{StakeLockout, Tower},
|
consensus::{StakeLockout, Tower},
|
||||||
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
|
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
|
||||||
|
@ -114,6 +115,7 @@ impl ReplayStage {
|
||||||
ledger_signal_receiver: Receiver<bool>,
|
ledger_signal_receiver: Receiver<bool>,
|
||||||
poh_recorder: Arc<Mutex<PohRecorder>>,
|
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||||
vote_tracker: Arc<VoteTracker>,
|
vote_tracker: Arc<VoteTracker>,
|
||||||
|
cluster_slots: Arc<ClusterSlots>,
|
||||||
retransmit_slots_sender: RetransmitSlotsSender,
|
retransmit_slots_sender: RetransmitSlotsSender,
|
||||||
) -> (Self, Receiver<Vec<Arc<Bank>>>) {
|
) -> (Self, Receiver<Vec<Arc<Bank>>>) {
|
||||||
let ReplayStageConfig {
|
let ReplayStageConfig {
|
||||||
|
@ -231,6 +233,7 @@ impl ReplayStage {
|
||||||
&tower,
|
&tower,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
|
&cluster_slots,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
&mut all_pubkeys,
|
&mut all_pubkeys,
|
||||||
);
|
);
|
||||||
|
@ -288,10 +291,11 @@ impl ReplayStage {
|
||||||
progress.get_propagated_stats(latest_leader_slot)
|
progress.get_propagated_stats(latest_leader_slot)
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
"total staked: {}, observed staked: {}, pubkeys: {:?}, latest_leader_slot: {}, epoch: {:?}",
|
"total staked: {}, observed staked: {}, vote pubkeys: {:?}, node_pubkeys: {:?}, latest_leader_slot: {}, epoch: {:?}",
|
||||||
stats.total_epoch_stake,
|
stats.total_epoch_stake,
|
||||||
stats.propagated_validators_stake,
|
stats.propagated_validators_stake,
|
||||||
stats.propagated_validators,
|
stats.propagated_validators,
|
||||||
|
stats.propagated_node_ids,
|
||||||
latest_leader_slot,
|
latest_leader_slot,
|
||||||
bank_forks.read().unwrap().get(latest_leader_slot).map(|x| x.epoch()),
|
bank_forks.read().unwrap().get(latest_leader_slot).map(|x| x.epoch()),
|
||||||
);
|
);
|
||||||
|
@ -916,6 +920,7 @@ impl ReplayStage {
|
||||||
tower: &Tower,
|
tower: &Tower,
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
vote_tracker: &VoteTracker,
|
vote_tracker: &VoteTracker,
|
||||||
|
cluster_slots: &ClusterSlots,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
) -> Vec<Slot> {
|
) -> Vec<Slot> {
|
||||||
|
@ -976,6 +981,7 @@ impl ReplayStage {
|
||||||
all_pubkeys,
|
all_pubkeys,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
vote_tracker,
|
vote_tracker,
|
||||||
|
cluster_slots,
|
||||||
);
|
);
|
||||||
|
|
||||||
let stats = progress
|
let stats = progress
|
||||||
|
@ -1000,6 +1006,7 @@ impl ReplayStage {
|
||||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
vote_tracker: &VoteTracker,
|
vote_tracker: &VoteTracker,
|
||||||
|
cluster_slots: &ClusterSlots,
|
||||||
) {
|
) {
|
||||||
// If propagation has already been confirmed, return
|
// If propagation has already been confirmed, return
|
||||||
if progress.is_propagated(slot) {
|
if progress.is_propagated(slot) {
|
||||||
|
@ -1021,14 +1028,33 @@ impl ReplayStage {
|
||||||
.slot_vote_tracker = slot_vote_tracker.clone();
|
.slot_vote_tracker = slot_vote_tracker.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut cluster_slot_pubkeys = progress
|
||||||
|
.get_propagated_stats(slot)
|
||||||
|
.expect("All frozen banks must exist in the Progress map")
|
||||||
|
.cluster_slot_pubkeys
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
if cluster_slot_pubkeys.is_none() {
|
||||||
|
cluster_slot_pubkeys = cluster_slots.lookup(slot);
|
||||||
|
progress
|
||||||
|
.get_propagated_stats_mut(slot)
|
||||||
|
.expect("All frozen banks must exist in the Progress map")
|
||||||
|
.cluster_slot_pubkeys = cluster_slot_pubkeys.clone();
|
||||||
|
}
|
||||||
|
|
||||||
let newly_voted_pubkeys = slot_vote_tracker
|
let newly_voted_pubkeys = slot_vote_tracker
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|slot_vote_tracker| slot_vote_tracker.write().unwrap().get_updates())
|
.and_then(|slot_vote_tracker| slot_vote_tracker.write().unwrap().get_updates())
|
||||||
.unwrap_or_else(|| vec![]);
|
.unwrap_or_else(|| vec![]);
|
||||||
|
|
||||||
|
let cluster_slot_pubkeys = cluster_slot_pubkeys
|
||||||
|
.map(|v| v.read().unwrap().keys().cloned().collect())
|
||||||
|
.unwrap_or_else(|| vec![]);
|
||||||
|
|
||||||
Self::update_fork_propagated_threshold_from_votes(
|
Self::update_fork_propagated_threshold_from_votes(
|
||||||
progress,
|
progress,
|
||||||
newly_voted_pubkeys,
|
newly_voted_pubkeys,
|
||||||
|
cluster_slot_pubkeys,
|
||||||
slot,
|
slot,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
all_pubkeys,
|
all_pubkeys,
|
||||||
|
@ -1252,6 +1278,7 @@ impl ReplayStage {
|
||||||
fn update_fork_propagated_threshold_from_votes(
|
fn update_fork_propagated_threshold_from_votes(
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
mut newly_voted_pubkeys: Vec<impl Deref<Target = Pubkey>>,
|
mut newly_voted_pubkeys: Vec<impl Deref<Target = Pubkey>>,
|
||||||
|
mut cluster_slot_pubkeys: Vec<impl Deref<Target = Pubkey>>,
|
||||||
fork_tip: Slot,
|
fork_tip: Slot,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
|
@ -1271,14 +1298,15 @@ impl ReplayStage {
|
||||||
.expect("current_leader_slot > root, so must exist in the progress map");
|
.expect("current_leader_slot > root, so must exist in the progress map");
|
||||||
|
|
||||||
// If a descendant has reached propagation threshold, then
|
// If a descendant has reached propagation threshold, then
|
||||||
// all its ancestor banks have alsso reached propagation
|
// all its ancestor banks have also reached propagation
|
||||||
// threshold as well (Validators can't have voted for a
|
// threshold as well (Validators can't have voted for a
|
||||||
// descendant without also getting the ancestor block)
|
// descendant without also getting the ancestor block)
|
||||||
if leader_propagated_stats.is_propagated ||
|
if leader_propagated_stats.is_propagated ||
|
||||||
// If there's no new validators to record, and there's no
|
// If there's no new validators to record, and there's no
|
||||||
// newly achieved threshold, then there's no further
|
// newly achieved threshold, then there's no further
|
||||||
// information to propagate backwards to past leader blocks
|
// information to propagate backwards to past leader blocks
|
||||||
(newly_voted_pubkeys.is_empty() && !did_newly_reach_threshold)
|
(newly_voted_pubkeys.is_empty() && cluster_slot_pubkeys.is_empty() &&
|
||||||
|
!did_newly_reach_threshold)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1296,6 +1324,7 @@ impl ReplayStage {
|
||||||
|
|
||||||
did_newly_reach_threshold = Self::update_slot_propagated_threshold_from_votes(
|
did_newly_reach_threshold = Self::update_slot_propagated_threshold_from_votes(
|
||||||
&mut newly_voted_pubkeys,
|
&mut newly_voted_pubkeys,
|
||||||
|
&mut cluster_slot_pubkeys,
|
||||||
&leader_bank,
|
&leader_bank,
|
||||||
leader_propagated_stats,
|
leader_propagated_stats,
|
||||||
all_pubkeys,
|
all_pubkeys,
|
||||||
|
@ -1309,6 +1338,7 @@ impl ReplayStage {
|
||||||
|
|
||||||
fn update_slot_propagated_threshold_from_votes(
|
fn update_slot_propagated_threshold_from_votes(
|
||||||
newly_voted_pubkeys: &mut Vec<impl Deref<Target = Pubkey>>,
|
newly_voted_pubkeys: &mut Vec<impl Deref<Target = Pubkey>>,
|
||||||
|
cluster_slot_pubkeys: &mut Vec<impl Deref<Target = Pubkey>>,
|
||||||
leader_bank: &Bank,
|
leader_bank: &Bank,
|
||||||
leader_propagated_stats: &mut PropagatedStats,
|
leader_propagated_stats: &mut PropagatedStats,
|
||||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||||
|
@ -1335,45 +1365,42 @@ impl ReplayStage {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the valdators that we already know voted for this slot
|
// Remove the vote/node pubkeys that we already know voted for this
|
||||||
// Those validators are safe to drop because they don't to be ported back any
|
// slot. These vote accounts/validator identities are safe to drop
|
||||||
// further because parents must have:
|
// because they don't to be ported back any further because earler
|
||||||
// 1) Also recorded this validator already, or
|
// parents must have:
|
||||||
|
// 1) Also recorded these pubkeyss already, or
|
||||||
// 2) Already reached the propagation threshold, in which case
|
// 2) Already reached the propagation threshold, in which case
|
||||||
// they no longer need to track the set of propagated validators
|
// they no longer need to track the set of propagated validators
|
||||||
newly_voted_pubkeys.retain(|voting_pubkey| {
|
newly_voted_pubkeys.retain(|vote_pubkey| {
|
||||||
if !leader_propagated_stats
|
let exists = leader_propagated_stats
|
||||||
.propagated_validators
|
.propagated_validators
|
||||||
.contains(&**voting_pubkey)
|
.contains(&**vote_pubkey);
|
||||||
{
|
leader_propagated_stats.add_vote_pubkey(
|
||||||
let mut cached_pubkey: Option<Rc<Pubkey>> =
|
&*vote_pubkey,
|
||||||
all_pubkeys.get(&**voting_pubkey).cloned();
|
all_pubkeys,
|
||||||
if cached_pubkey.is_none() {
|
leader_bank.epoch_vote_account_stake(&vote_pubkey),
|
||||||
let new_pubkey = Rc::new(**voting_pubkey);
|
);
|
||||||
all_pubkeys.insert(new_pubkey.clone());
|
!exists
|
||||||
cached_pubkey = Some(new_pubkey);
|
|
||||||
}
|
|
||||||
let voting_pubkey = cached_pubkey.unwrap();
|
|
||||||
leader_propagated_stats
|
|
||||||
.propagated_validators
|
|
||||||
.insert(voting_pubkey.clone());
|
|
||||||
leader_propagated_stats.propagated_validators_stake +=
|
|
||||||
leader_bank.epoch_vote_account_stake(&voting_pubkey);
|
|
||||||
|
|
||||||
if leader_propagated_stats.total_epoch_stake == 0
|
|
||||||
|| leader_propagated_stats.propagated_validators_stake as f64
|
|
||||||
/ leader_propagated_stats.total_epoch_stake as f64
|
|
||||||
> SUPERMINORITY_THRESHOLD
|
|
||||||
{
|
|
||||||
leader_propagated_stats.is_propagated = true;
|
|
||||||
did_newly_reach_threshold = true
|
|
||||||
}
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
cluster_slot_pubkeys.retain(|node_pubkey| {
|
||||||
|
let exists = leader_propagated_stats
|
||||||
|
.propagated_node_ids
|
||||||
|
.contains(&**node_pubkey);
|
||||||
|
leader_propagated_stats.add_node_pubkey(&*node_pubkey, all_pubkeys, leader_bank);
|
||||||
|
!exists
|
||||||
|
});
|
||||||
|
|
||||||
|
if leader_propagated_stats.total_epoch_stake == 0
|
||||||
|
|| leader_propagated_stats.propagated_validators_stake as f64
|
||||||
|
/ leader_propagated_stats.total_epoch_stake as f64
|
||||||
|
> SUPERMINORITY_THRESHOLD
|
||||||
|
{
|
||||||
|
leader_propagated_stats.is_propagated = true;
|
||||||
|
did_newly_reach_threshold = true
|
||||||
|
}
|
||||||
|
|
||||||
did_newly_reach_threshold
|
did_newly_reach_threshold
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1484,20 +1511,15 @@ impl ReplayStage {
|
||||||
&rewards_recorder_sender,
|
&rewards_recorder_sender,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
);
|
);
|
||||||
if let Some(leader_vote_accounts) =
|
let empty: Vec<&Pubkey> = vec![];
|
||||||
child_bank.epoch_vote_accounts_for_node_id(&leader)
|
Self::update_fork_propagated_threshold_from_votes(
|
||||||
{
|
progress,
|
||||||
Self::update_fork_propagated_threshold_from_votes(
|
empty,
|
||||||
progress,
|
vec![&leader],
|
||||||
leader_vote_accounts
|
parent_bank.slot(),
|
||||||
.vote_accounts
|
bank_forks,
|
||||||
.iter()
|
all_pubkeys,
|
||||||
.collect::<Vec<_>>(),
|
);
|
||||||
parent_bank.slot(),
|
|
||||||
bank_forks,
|
|
||||||
all_pubkeys,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
new_banks.insert(child_slot, child_bank);
|
new_banks.insert(child_slot, child_bank);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1623,6 +1645,7 @@ pub(crate) mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
let vote_tracker = VoteTracker::default();
|
let vote_tracker = VoteTracker::default();
|
||||||
|
let cluster_slots = ClusterSlots::default();
|
||||||
let mut towers: Vec<Tower> = iter::repeat_with(|| Tower::new_for_tests(8, 0.67))
|
let mut towers: Vec<Tower> = iter::repeat_with(|| Tower::new_for_tests(8, 0.67))
|
||||||
.take(validators.len())
|
.take(validators.len())
|
||||||
.collect();
|
.collect();
|
||||||
|
@ -1789,6 +1812,7 @@ pub(crate) mod tests {
|
||||||
&towers[i],
|
&towers[i],
|
||||||
&mut fork_progresses[i],
|
&mut fork_progresses[i],
|
||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
|
&cluster_slots,
|
||||||
&wrapped_bank_fork,
|
&wrapped_bank_fork,
|
||||||
&mut all_pubkeys,
|
&mut all_pubkeys,
|
||||||
);
|
);
|
||||||
|
@ -2572,6 +2596,7 @@ pub(crate) mod tests {
|
||||||
&tower,
|
&tower,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&VoteTracker::default(),
|
&VoteTracker::default(),
|
||||||
|
&ClusterSlots::default(),
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
);
|
);
|
||||||
|
@ -2609,6 +2634,7 @@ pub(crate) mod tests {
|
||||||
&tower,
|
&tower,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&VoteTracker::default(),
|
&VoteTracker::default(),
|
||||||
|
&ClusterSlots::default(),
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
);
|
);
|
||||||
|
@ -2641,6 +2667,7 @@ pub(crate) mod tests {
|
||||||
&tower,
|
&tower,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&VoteTracker::default(),
|
&VoteTracker::default(),
|
||||||
|
&ClusterSlots::default(),
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
);
|
);
|
||||||
|
@ -2699,6 +2726,7 @@ pub(crate) mod tests {
|
||||||
&tower,
|
&tower,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&VoteTracker::default(),
|
&VoteTracker::default(),
|
||||||
|
&ClusterSlots::default(),
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
);
|
);
|
||||||
|
@ -2763,27 +2791,76 @@ pub(crate) mod tests {
|
||||||
.take(10)
|
.take(10)
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let vote_pubkeys: Vec<_> = keypairs
|
let new_vote_pubkeys: Vec<_> = keypairs
|
||||||
.values()
|
.values()
|
||||||
.map(|keys| keys.vote_keypair.pubkey())
|
.map(|keys| keys.vote_keypair.pubkey())
|
||||||
.collect();
|
.collect();
|
||||||
|
let new_node_pubkeys: Vec<_> = keypairs
|
||||||
|
.values()
|
||||||
|
.map(|keys| keys.node_keypair.pubkey())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Once 4/10 validators have voted, we have hit threshold
|
||||||
|
run_test_update_slot_propagated_threshold_from_votes(&keypairs, &new_vote_pubkeys, &[], 4);
|
||||||
|
// Adding the same node pubkey's instead of the corresponding
|
||||||
|
// vote pubkeys should be equivalent
|
||||||
|
run_test_update_slot_propagated_threshold_from_votes(&keypairs, &[], &new_node_pubkeys, 4);
|
||||||
|
// Adding the same node pubkey's in the same order as their
|
||||||
|
// corresponding vote accounts is redundant, so we don't
|
||||||
|
// reach the threshold any sooner.
|
||||||
|
run_test_update_slot_propagated_threshold_from_votes(
|
||||||
|
&keypairs,
|
||||||
|
&new_vote_pubkeys,
|
||||||
|
&new_node_pubkeys,
|
||||||
|
4,
|
||||||
|
);
|
||||||
|
// However, if we add different node pubkey's than the
|
||||||
|
// vote accounts, we should hit threshold much faster
|
||||||
|
// because now we are getting 2 new pubkeys on each
|
||||||
|
// iteration instead of 1, so by the 2nd iteration
|
||||||
|
// we should have 4/10 validators voting
|
||||||
|
run_test_update_slot_propagated_threshold_from_votes(
|
||||||
|
&keypairs,
|
||||||
|
&new_vote_pubkeys[0..5],
|
||||||
|
&new_node_pubkeys[5..],
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_test_update_slot_propagated_threshold_from_votes(
|
||||||
|
all_keypairs: &HashMap<Pubkey, ValidatorVoteKeypairs>,
|
||||||
|
new_vote_pubkeys: &[Pubkey],
|
||||||
|
new_node_pubkeys: &[Pubkey],
|
||||||
|
success_index: usize,
|
||||||
|
) {
|
||||||
let stake = 10_000;
|
let stake = 10_000;
|
||||||
let (bank_forks, _) = initialize_state(&keypairs, stake);
|
let (bank_forks, _) = initialize_state(&all_keypairs, stake);
|
||||||
let root_bank = bank_forks.root_bank().clone();
|
let root_bank = bank_forks.root_bank().clone();
|
||||||
let mut propagated_stats = PropagatedStats {
|
let mut propagated_stats = PropagatedStats {
|
||||||
total_epoch_stake: stake * 10,
|
total_epoch_stake: stake * all_keypairs.len() as u64,
|
||||||
..PropagatedStats::default()
|
..PropagatedStats::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut all_pubkeys = HashSet::new();
|
let mut all_pubkeys = HashSet::new();
|
||||||
let mut child_reached_threshold = false;
|
let child_reached_threshold = false;
|
||||||
for i in 0..10 {
|
for i in 0..std::cmp::max(new_vote_pubkeys.len(), new_node_pubkeys.len()) {
|
||||||
propagated_stats.is_propagated = false;
|
propagated_stats.is_propagated = false;
|
||||||
let mut newly_voted_pubkeys = vote_pubkeys[..i].iter().cloned().map(Arc::new).collect();
|
let len = std::cmp::min(i, new_vote_pubkeys.len());
|
||||||
|
let mut voted_pubkeys = new_vote_pubkeys[..len]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(Arc::new)
|
||||||
|
.collect();
|
||||||
|
let len = std::cmp::min(i, new_node_pubkeys.len());
|
||||||
|
let mut node_pubkeys = new_node_pubkeys[..len]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(Arc::new)
|
||||||
|
.collect();
|
||||||
let did_newly_reach_threshold =
|
let did_newly_reach_threshold =
|
||||||
ReplayStage::update_slot_propagated_threshold_from_votes(
|
ReplayStage::update_slot_propagated_threshold_from_votes(
|
||||||
&mut newly_voted_pubkeys,
|
&mut voted_pubkeys,
|
||||||
|
&mut node_pubkeys,
|
||||||
&root_bank,
|
&root_bank,
|
||||||
&mut propagated_stats,
|
&mut propagated_stats,
|
||||||
&mut all_pubkeys,
|
&mut all_pubkeys,
|
||||||
|
@ -2792,20 +2869,28 @@ pub(crate) mod tests {
|
||||||
|
|
||||||
// Only the i'th voted pubkey should be new (everything else was
|
// Only the i'th voted pubkey should be new (everything else was
|
||||||
// inserted in previous iteration of the loop), so those redundant
|
// inserted in previous iteration of the loop), so those redundant
|
||||||
// pubkeys should be filtered out
|
// pubkeys should have been filtered out
|
||||||
let added_pubkeys = {
|
let remaining_vote_pubkeys = {
|
||||||
if i == 0 {
|
if i == 0 || i >= new_vote_pubkeys.len() {
|
||||||
vec![]
|
vec![]
|
||||||
} else {
|
} else {
|
||||||
vec![Arc::new(vote_pubkeys[i - 1])]
|
vec![Arc::new(new_vote_pubkeys[i - 1])]
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
assert_eq!(newly_voted_pubkeys, added_pubkeys);
|
let remaining_node_pubkeys = {
|
||||||
|
if i == 0 || i >= new_node_pubkeys.len() {
|
||||||
|
vec![]
|
||||||
|
} else {
|
||||||
|
vec![Arc::new(new_node_pubkeys[i - 1])]
|
||||||
|
}
|
||||||
|
};
|
||||||
|
assert_eq!(voted_pubkeys, remaining_vote_pubkeys);
|
||||||
|
assert_eq!(node_pubkeys, remaining_node_pubkeys);
|
||||||
|
|
||||||
// If we crossed the superminority threshold, then
|
// If we crossed the superminority threshold, then
|
||||||
// `did_newly_reach_threshold == true`, otherwise the
|
// `did_newly_reach_threshold == true`, otherwise the
|
||||||
// threshold has not been reached
|
// threshold has not been reached
|
||||||
if i >= 4 {
|
if i >= success_index {
|
||||||
assert!(propagated_stats.is_propagated);
|
assert!(propagated_stats.is_propagated);
|
||||||
assert!(did_newly_reach_threshold);
|
assert!(did_newly_reach_threshold);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2813,21 +2898,29 @@ pub(crate) mod tests {
|
||||||
assert!(!did_newly_reach_threshold);
|
assert!(!did_newly_reach_threshold);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_update_slot_propagated_threshold_from_votes2() {
|
||||||
|
let mut empty: Vec<&Pubkey> = vec![];
|
||||||
|
let genesis_config = create_genesis_config(100_000_000).genesis_config;
|
||||||
|
let root_bank = Bank::new(&genesis_config);
|
||||||
|
let stake = 10_000;
|
||||||
// Simulate a child slot seeing threshold (`child_reached_threshold` = true),
|
// Simulate a child slot seeing threshold (`child_reached_threshold` = true),
|
||||||
// then the parent should also be marked as having reached threshold,
|
// then the parent should also be marked as having reached threshold,
|
||||||
// even if there are no new pubkeys to add (`newly_voted_pubkeys.is_empty()`)
|
// even if there are no new pubkeys to add (`newly_voted_pubkeys.is_empty()`)
|
||||||
propagated_stats = PropagatedStats {
|
let mut propagated_stats = PropagatedStats {
|
||||||
total_epoch_stake: stake * 10,
|
total_epoch_stake: stake * 10,
|
||||||
..PropagatedStats::default()
|
..PropagatedStats::default()
|
||||||
};
|
};
|
||||||
propagated_stats.total_epoch_stake = stake * 10;
|
propagated_stats.total_epoch_stake = stake * 10;
|
||||||
all_pubkeys = HashSet::new();
|
let mut all_pubkeys = HashSet::new();
|
||||||
child_reached_threshold = true;
|
let child_reached_threshold = true;
|
||||||
let mut newly_voted_pubkeys: Vec<Arc<Pubkey>> = vec![];
|
let mut newly_voted_pubkeys: Vec<Arc<Pubkey>> = vec![];
|
||||||
|
|
||||||
assert!(ReplayStage::update_slot_propagated_threshold_from_votes(
|
assert!(ReplayStage::update_slot_propagated_threshold_from_votes(
|
||||||
&mut newly_voted_pubkeys,
|
&mut newly_voted_pubkeys,
|
||||||
|
&mut empty,
|
||||||
&root_bank,
|
&root_bank,
|
||||||
&mut propagated_stats,
|
&mut propagated_stats,
|
||||||
&mut all_pubkeys,
|
&mut all_pubkeys,
|
||||||
|
@ -2842,19 +2935,20 @@ pub(crate) mod tests {
|
||||||
};
|
};
|
||||||
propagated_stats.is_propagated = true;
|
propagated_stats.is_propagated = true;
|
||||||
all_pubkeys = HashSet::new();
|
all_pubkeys = HashSet::new();
|
||||||
child_reached_threshold = true;
|
|
||||||
newly_voted_pubkeys = vec![];
|
newly_voted_pubkeys = vec![];
|
||||||
assert!(!ReplayStage::update_slot_propagated_threshold_from_votes(
|
assert!(!ReplayStage::update_slot_propagated_threshold_from_votes(
|
||||||
&mut newly_voted_pubkeys,
|
&mut newly_voted_pubkeys,
|
||||||
|
&mut empty,
|
||||||
&root_bank,
|
&root_bank,
|
||||||
&mut propagated_stats,
|
&mut propagated_stats,
|
||||||
&mut all_pubkeys,
|
&mut all_pubkeys,
|
||||||
child_reached_threshold,
|
child_reached_threshold,
|
||||||
));
|
));
|
||||||
|
|
||||||
child_reached_threshold = false;
|
let child_reached_threshold = false;
|
||||||
assert!(!ReplayStage::update_slot_propagated_threshold_from_votes(
|
assert!(!ReplayStage::update_slot_propagated_threshold_from_votes(
|
||||||
&mut newly_voted_pubkeys,
|
&mut newly_voted_pubkeys,
|
||||||
|
&mut empty,
|
||||||
&root_bank,
|
&root_bank,
|
||||||
&mut propagated_stats,
|
&mut propagated_stats,
|
||||||
&mut all_pubkeys,
|
&mut all_pubkeys,
|
||||||
|
@ -2923,6 +3017,7 @@ pub(crate) mod tests {
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
&RwLock::new(bank_forks),
|
&RwLock::new(bank_forks),
|
||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
|
&ClusterSlots::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let propagated_stats = &progress_map.get(&10).unwrap().propagated_stats;
|
let propagated_stats = &progress_map.get(&10).unwrap().propagated_stats;
|
||||||
|
@ -3009,6 +3104,7 @@ pub(crate) mod tests {
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
&RwLock::new(bank_forks),
|
&RwLock::new(bank_forks),
|
||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
|
&ClusterSlots::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
for i in 1..=10 {
|
for i in 1..=10 {
|
||||||
|
@ -3096,6 +3192,7 @@ pub(crate) mod tests {
|
||||||
&mut HashSet::new(),
|
&mut HashSet::new(),
|
||||||
&RwLock::new(bank_forks),
|
&RwLock::new(bank_forks),
|
||||||
&vote_tracker,
|
&vote_tracker,
|
||||||
|
&ClusterSlots::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Only the first 5 banks should have reached the threshold
|
// Only the first 5 banks should have reached the threshold
|
||||||
|
|
|
@ -160,7 +160,7 @@ impl Tvu {
|
||||||
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
|
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
|
||||||
cfg,
|
cfg,
|
||||||
tvu_config.shred_version,
|
tvu_config.shred_version,
|
||||||
cluster_slots,
|
cluster_slots.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
|
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
|
||||||
|
@ -198,6 +198,7 @@ impl Tvu {
|
||||||
ledger_signal_receiver,
|
ledger_signal_receiver,
|
||||||
poh_recorder.clone(),
|
poh_recorder.clone(),
|
||||||
vote_tracker,
|
vote_tracker,
|
||||||
|
cluster_slots,
|
||||||
retransmit_slots_sender,
|
retransmit_slots_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue