removes locked pubkey references (#15152)

This commit is contained in:
behzad nouri 2021-02-08 02:07:00 +00:00 committed by GitHub
parent 96e521be28
commit b6f231b60e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 53 additions and 152 deletions

View File

@ -4,7 +4,6 @@ use crate::{
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
poh_recorder::PohRecorder,
pubkey_references::LockedPubkeyReferences,
result::{Error, Result},
rpc_subscriptions::RpcSubscriptions,
sigverify,
@ -57,15 +56,15 @@ pub struct SlotVoteTracker {
// Maps pubkeys that have voted for this slot
// to whether or not we've seen the vote on gossip.
// True if seen on gossip, false if only seen in replay.
voted: HashMap<Arc<Pubkey>, bool>,
voted: HashMap<Pubkey, bool>,
optimistic_votes_tracker: HashMap<Hash, VoteStakeTracker>,
updates: Option<Vec<Arc<Pubkey>>>,
updates: Option<Vec<Pubkey>>,
gossip_only_stake: u64,
}
impl SlotVoteTracker {
#[allow(dead_code)]
pub fn get_updates(&mut self) -> Option<Vec<Arc<Pubkey>>> {
pub fn get_updates(&mut self) -> Option<Vec<Pubkey>> {
self.updates.take()
}
@ -85,7 +84,6 @@ pub struct VoteTracker {
epoch_authorized_voters: RwLock<HashMap<Epoch, Arc<EpochAuthorizedVoters>>>,
leader_schedule_epoch: RwLock<Epoch>,
current_epoch: RwLock<Epoch>,
keys: LockedPubkeyReferences,
epoch_schedule: EpochSchedule,
}
@ -157,21 +155,19 @@ impl VoteTracker {
}
#[cfg(test)]
pub fn insert_vote(&self, slot: Slot, pubkey: Arc<Pubkey>) {
pub fn insert_vote(&self, slot: Slot, pubkey: Pubkey) {
let mut w_slot_vote_trackers = self.slot_vote_trackers.write().unwrap();
let slot_vote_tracker = w_slot_vote_trackers.entry(slot).or_default();
let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap();
w_slot_vote_tracker.voted.insert(pubkey.clone(), true);
w_slot_vote_tracker.voted.insert(pubkey, true);
if let Some(ref mut updates) = w_slot_vote_tracker.updates {
updates.push(pubkey.clone())
updates.push(pubkey)
} else {
w_slot_vote_tracker.updates = Some(vec![pubkey.clone()]);
w_slot_vote_tracker.updates = Some(vec![pubkey]);
}
self.keys.get_or_insert(&pubkey);
}
fn progress_leader_schedule_epoch(&self, root_bank: &Bank) {
@ -221,7 +217,6 @@ impl VoteTracker {
.write()
.unwrap()
.retain(|epoch, _| *epoch >= root_epoch);
self.keys.purge();
*self.current_epoch.write().unwrap() = root_epoch;
}
}
@ -547,7 +542,7 @@ impl ClusterInfoVoteListener {
root_bank: &Bank,
subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender,
diff: &mut HashMap<Slot, HashMap<Arc<Pubkey>, bool>>,
diff: &mut HashMap<Slot, HashMap<Pubkey, bool>>,
new_optimistic_confirmed_slots: &mut Vec<(Slot, Hash)>,
is_gossip_vote: bool,
bank_notification_sender: &Option<BankNotificationSender>,
@ -572,7 +567,6 @@ impl ClusterInfoVoteListener {
continue;
}
let epoch_stakes = epoch_stakes.unwrap();
let unduplicated_pubkey = vote_tracker.keys.get_or_insert(&vote_pubkey);
// The last vote slot, which is the greatest slot in the stack
// of votes in a vote transaction, qualifies for optimistic confirmation.
@ -591,7 +585,7 @@ impl ClusterInfoVoteListener {
vote_tracker,
last_vote_slot,
last_vote_hash,
unduplicated_pubkey.clone(),
*vote_pubkey,
stake,
total_stake,
);
@ -626,7 +620,7 @@ impl ClusterInfoVoteListener {
diff.entry(slot)
.or_default()
.entry(unduplicated_pubkey)
.entry(*vote_pubkey)
.and_modify(|seen_in_gossip_previously| {
*seen_in_gossip_previously = *seen_in_gossip_previously || is_gossip_vote
})
@ -681,7 +675,7 @@ impl ClusterInfoVoteListener {
verified_vote_sender: &VerifiedVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
) -> Vec<(Slot, Hash)> {
let mut diff: HashMap<Slot, HashMap<Arc<Pubkey>, bool>> = HashMap::new();
let mut diff: HashMap<Slot, HashMap<Pubkey, bool>> = HashMap::new();
let mut new_optimistic_confirmed_slots = vec![];
// Process votes from gossip and ReplayStage
@ -748,9 +742,7 @@ impl ClusterInfoVoteListener {
// no other writers to `slot_vote_tracker` that
// `is_new || is_new_from_gossip`. In both cases we want to record
// `is_new_from_gossip` for the `pubkey` entry.
w_slot_tracker
.voted
.insert(pubkey.clone(), seen_in_gossip_above);
w_slot_tracker.voted.insert(pubkey, seen_in_gossip_above);
w_slot_tracker.updates.as_mut().unwrap().push(pubkey);
}
@ -765,7 +757,7 @@ impl ClusterInfoVoteListener {
vote_tracker: &VoteTracker,
slot: Slot,
hash: Hash,
pubkey: Arc<Pubkey>,
pubkey: Pubkey,
stake: u64,
total_epoch_stake: u64,
) -> (bool, bool) {
@ -902,10 +894,10 @@ mod tests {
let (vote_tracker, bank, _, _) = setup();
// Check outdated slots are purged with new root
let new_voter = Arc::new(solana_sdk::pubkey::new_rand());
let new_voter = solana_sdk::pubkey::new_rand();
// Make separate copy so the original doesn't count toward
// the ref count, which would prevent cleanup
let new_voter_ = Arc::new(*new_voter);
let new_voter_ = new_voter;
vote_tracker.insert_vote(bank.slot(), new_voter_);
assert!(vote_tracker
.slot_vote_trackers
@ -922,7 +914,6 @@ mod tests {
// Check `keys` and `epoch_authorized_voters` are purged when new
// root bank moves to the next epoch
assert!(vote_tracker.keys.0.read().unwrap().contains(&new_voter));
let current_epoch = bank.epoch();
let new_epoch_bank = Bank::new_from_parent(
&bank,
@ -931,7 +922,6 @@ mod tests {
.get_first_slot_in_epoch(current_epoch + 1),
);
vote_tracker.progress_with_new_root_bank(&new_epoch_bank);
assert!(!vote_tracker.keys.0.read().unwrap().contains(&new_voter));
assert_eq!(
*vote_tracker.current_epoch.read().unwrap(),
current_epoch + 1
@ -1431,13 +1421,6 @@ mod tests {
#[test]
fn test_vote_tracker_references() {
// The number of references that get stored for a pubkey every time
// a vote is added to the tracking set via a transaction. One stored in the
// SlotVoteTracker.voted, one in SlotVoteTracker.updates, one in
// SlotVoteTracker.optimistic_votes_tracker
let ref_count_per_vote = 3;
let ref_count_per_new_key = 1;
// Create some voters at genesis
let validator_keypairs: Vec<_> =
(0..2).map(|_| ValidatorVoteKeypairs::new_rand()).collect();
@ -1492,22 +1475,6 @@ mod tests {
&verified_vote_sender,
&None,
);
let ref_count = Arc::strong_count(
&vote_tracker
.keys
.0
.read()
.unwrap()
.get(&validator0_keypairs.vote_keypair.pubkey())
.unwrap(),
);
// This new pubkey submitted a vote for a slot, so ref count is
// `ref_count_per_vote + ref_count_per_new_key`.
// +ref_count_per_new_key for the new pubkey in `vote_tracker.keys` and
// +ref_count_per_vote for the one new vote
let mut current_ref_count = ref_count_per_vote + ref_count_per_new_key;
assert_eq!(ref_count, current_ref_count);
// Setup next epoch
let old_epoch = bank.get_leader_schedule_epoch(bank.slot());
@ -1562,35 +1529,6 @@ mod tests {
&verified_vote_sender,
&None,
);
// Check new replay vote pubkey first
let ref_count = Arc::strong_count(
&vote_tracker
.keys
.0
.read()
.unwrap()
.get(&validator_keypairs[1].vote_keypair.pubkey())
.unwrap(),
);
// This new pubkey submitted a replay vote for a slot, so ref count is
// `ref_count_per_optimistic_vote + ref_count_per_new_key`.
// +ref_count_per_new_key for the new pubkey in `vote_tracker.keys` and
// +ref_count_per_optimistic_vote for the one new vote
assert_eq!(ref_count, ref_count_per_vote + ref_count_per_new_key);
// Check the existing pubkey
let ref_count = Arc::strong_count(
&vote_tracker
.keys
.0
.read()
.unwrap()
.get(&validator0_keypairs.vote_keypair.pubkey())
.unwrap(),
);
current_ref_count += 2 * ref_count_per_vote;
assert_eq!(ref_count, current_ref_count);
}
fn setup() -> (

View File

@ -1,6 +1,6 @@
use crate::{
cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots,
pubkey_references::LockedPubkeyReferences, serve_repair::RepairType,
serve_repair::RepairType,
};
use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts};
use solana_sdk::{clock::Slot, pubkey::Pubkey};
@ -9,13 +9,12 @@ use std::{
sync::{Arc, RwLock},
};
pub type SlotPubkeys = HashMap<Arc<Pubkey>, u64>;
pub type SlotPubkeys = HashMap<Pubkey, u64>;
pub type ClusterSlotsMap = RwLock<HashMap<Slot, Arc<RwLock<SlotPubkeys>>>>;
#[derive(Default)]
pub struct ClusterSlots {
cluster_slots: ClusterSlotsMap,
keys: LockedPubkeyReferences,
since: RwLock<Option<u64>>,
validator_stakes: RwLock<Arc<NodeIdToVoteAccounts>>,
epoch: RwLock<Option<u64>>,
@ -40,12 +39,10 @@ impl ClusterSlots {
if *slot <= root {
continue;
}
let unduplicated_pubkey = self.keys.get_or_insert(&epoch_slots.from);
self.insert_node_id(*slot, unduplicated_pubkey);
self.insert_node_id(*slot, epoch_slots.from);
}
}
self.cluster_slots.write().unwrap().retain(|x, _| *x > root);
self.keys.purge();
*self.since.write().unwrap() = since;
}
@ -60,7 +57,7 @@ impl ClusterSlots {
.collect()
}
pub fn insert_node_id(&self, slot: Slot, node_id: Arc<Pubkey>) {
pub fn insert_node_id(&self, slot: Slot, node_id: Pubkey) {
let balance = self
.validator_stakes
.read()
@ -241,8 +238,8 @@ mod tests {
let mut map = HashMap::new();
let k1 = solana_sdk::pubkey::new_rand();
let k2 = solana_sdk::pubkey::new_rand();
map.insert(Arc::new(k1), std::u64::MAX / 2);
map.insert(Arc::new(k2), 0);
map.insert(k1, std::u64::MAX / 2);
map.insert(k2, 0);
cs.cluster_slots
.write()
.unwrap()
@ -263,14 +260,14 @@ mod tests {
let mut map = HashMap::new();
let k1 = solana_sdk::pubkey::new_rand();
let k2 = solana_sdk::pubkey::new_rand();
map.insert(Arc::new(k2), 0);
map.insert(k2, 0);
cs.cluster_slots
.write()
.unwrap()
.insert(0, Arc::new(RwLock::new(map)));
//make sure default weights are used as well
let validator_stakes: HashMap<_, _> = vec![(
*Arc::new(k1),
k1,
NodeVoteAccounts {
total_stake: std::u64::MAX / 2,
vote_accounts: vec![Pubkey::default()],
@ -317,7 +314,7 @@ mod tests {
// Mark the first validator as completed slot 9, should pick that validator,
// even though it only has default stake, while the other validator has
// max stake
cs.insert_node_id(slot, Arc::new(contact_infos[0].id));
cs.insert_node_id(slot, contact_infos[0].id);
assert_eq!(
cs.compute_weights_exclude_noncomplete(slot, &contact_infos),
vec![(1, 0)]

View File

@ -47,7 +47,6 @@ pub mod ping_pong;
pub mod poh_recorder;
pub mod poh_service;
pub mod progress_map;
pub mod pubkey_references;
pub mod repair_response;
pub mod repair_service;
pub mod repair_weight;

View File

@ -1,24 +0,0 @@
use solana_sdk::pubkey::Pubkey;
use std::{
collections::HashSet,
sync::{Arc, RwLock},
};
#[derive(Default)]
pub struct LockedPubkeyReferences(pub RwLock<HashSet<Arc<Pubkey>>>);
impl LockedPubkeyReferences {
pub fn get_or_insert(&self, pubkey: &Pubkey) -> Arc<Pubkey> {
let mut cached_pubkey = self.0.read().unwrap().get(pubkey).cloned();
if cached_pubkey.is_none() {
let new_pubkey = Arc::new(*pubkey);
self.0.write().unwrap().insert(new_pubkey.clone());
cached_pubkey = Some(new_pubkey);
}
cached_pubkey.unwrap()
}
pub fn purge(&self) {
self.0.write().unwrap().retain(|x| Arc::strong_count(x) > 1);
}
}

View File

@ -1048,7 +1048,7 @@ mod test {
// a valid target for repair
let dead_slot = 9;
let cluster_slots = ClusterSlots::default();
cluster_slots.insert_node_id(dead_slot, Arc::new(valid_repair_peer.id));
cluster_slots.insert_node_id(dead_slot, valid_repair_peer.id);
cluster_info.insert_info(valid_repair_peer);
// Not enough time has passed, should not update the
@ -1178,7 +1178,7 @@ mod test {
let cluster_slots = ClusterSlots::default();
let duplicate_slot_repair_statuses = HashMap::new();
let keypairs = ValidatorVoteKeypairs::new_rand();
let only_node_id = Arc::new(keypairs.node_keypair.pubkey());
let only_node_id = keypairs.node_keypair.pubkey();
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
1_000_000_000,

View File

@ -43,7 +43,6 @@ use solana_sdk::{
use solana_vote_program::{vote_instruction, vote_state::Vote};
use std::{
collections::{HashMap, HashSet},
ops::Deref,
result,
sync::{
atomic::{AtomicBool, Ordering},
@ -1675,8 +1674,8 @@ impl ReplayStage {
fn update_fork_propagated_threshold_from_votes(
progress: &mut ProgressMap,
mut newly_voted_pubkeys: Vec<impl Deref<Target = Pubkey>>,
mut cluster_slot_pubkeys: Vec<impl Deref<Target = Pubkey>>,
mut newly_voted_pubkeys: Vec<Pubkey>,
mut cluster_slot_pubkeys: Vec<Pubkey>,
fork_tip: Slot,
bank_forks: &RwLock<BankForks>,
) {
@ -1733,8 +1732,8 @@ impl ReplayStage {
}
fn update_slot_propagated_threshold_from_votes(
newly_voted_pubkeys: &mut Vec<impl Deref<Target = Pubkey>>,
cluster_slot_pubkeys: &mut Vec<impl Deref<Target = Pubkey>>,
newly_voted_pubkeys: &mut Vec<Pubkey>,
cluster_slot_pubkeys: &mut Vec<Pubkey>,
leader_bank: &Bank,
leader_propagated_stats: &mut PropagatedStats,
did_child_reach_threshold: bool,
@ -1772,7 +1771,7 @@ impl ReplayStage {
.propagated_validators
.contains(vote_pubkey);
leader_propagated_stats.add_vote_pubkey(
**vote_pubkey,
*vote_pubkey,
leader_bank.epoch_vote_account_stake(&vote_pubkey),
);
!exists
@ -1781,7 +1780,7 @@ impl ReplayStage {
cluster_slot_pubkeys.retain(|node_pubkey| {
let exists = leader_propagated_stats
.propagated_node_ids
.contains(&**node_pubkey);
.contains(node_pubkey);
leader_propagated_stats.add_node_pubkey(&*node_pubkey, leader_bank);
!exists
});
@ -1901,11 +1900,11 @@ impl ReplayStage {
&leader,
subscriptions,
);
let empty: Vec<&Pubkey> = vec![];
let empty: Vec<Pubkey> = vec![];
Self::update_fork_propagated_threshold_from_votes(
progress,
empty,
vec![&leader],
vec![leader],
parent_bank.slot(),
bank_forks,
);
@ -3136,17 +3135,9 @@ pub(crate) mod tests {
for i in 0..std::cmp::max(new_vote_pubkeys.len(), new_node_pubkeys.len()) {
propagated_stats.is_propagated = false;
let len = std::cmp::min(i, new_vote_pubkeys.len());
let mut voted_pubkeys = new_vote_pubkeys[..len]
.iter()
.cloned()
.map(Arc::new)
.collect();
let mut voted_pubkeys = new_vote_pubkeys[..len].iter().copied().collect();
let len = std::cmp::min(i, new_node_pubkeys.len());
let mut node_pubkeys = new_node_pubkeys[..len]
.iter()
.cloned()
.map(Arc::new)
.collect();
let mut node_pubkeys = new_node_pubkeys[..len].iter().copied().collect();
let did_newly_reach_threshold =
ReplayStage::update_slot_propagated_threshold_from_votes(
&mut voted_pubkeys,
@ -3163,14 +3154,14 @@ pub(crate) mod tests {
if i == 0 || i >= new_vote_pubkeys.len() {
vec![]
} else {
vec![Arc::new(new_vote_pubkeys[i - 1])]
vec![new_vote_pubkeys[i - 1]]
}
};
let remaining_node_pubkeys = {
if i == 0 || i >= new_node_pubkeys.len() {
vec![]
} else {
vec![Arc::new(new_node_pubkeys[i - 1])]
vec![new_node_pubkeys[i - 1]]
}
};
assert_eq!(voted_pubkeys, remaining_vote_pubkeys);
@ -3191,7 +3182,7 @@ pub(crate) mod tests {
#[test]
fn test_update_slot_propagated_threshold_from_votes2() {
let mut empty: Vec<&Pubkey> = vec![];
let mut empty: Vec<Pubkey> = vec![];
let genesis_config = create_genesis_config(100_000_000).genesis_config;
let root_bank = Bank::new(&genesis_config);
let stake = 10_000;
@ -3204,7 +3195,7 @@ pub(crate) mod tests {
};
propagated_stats.total_epoch_stake = stake * 10;
let child_reached_threshold = true;
let mut newly_voted_pubkeys: Vec<Arc<Pubkey>> = vec![];
let mut newly_voted_pubkeys: Vec<Pubkey> = vec![];
assert!(ReplayStage::update_slot_propagated_threshold_from_votes(
&mut newly_voted_pubkeys,
@ -3245,7 +3236,7 @@ pub(crate) mod tests {
// Create genesis stakers
let vote_keypairs = ValidatorVoteKeypairs::new_rand();
let node_pubkey = vote_keypairs.node_keypair.pubkey();
let vote_pubkey = Arc::new(vote_keypairs.vote_keypair.pubkey());
let vote_pubkey = vote_keypairs.vote_keypair.pubkey();
let keypairs: HashMap<_, _> = vec![(node_pubkey, vote_keypairs)].into_iter().collect();
let stake = 10_000;
let (mut bank_forks, mut progress_map, _) = initialize_state(&keypairs, stake);
@ -3291,7 +3282,7 @@ pub(crate) mod tests {
assert!(!progress_map.is_propagated(10));
let vote_tracker = VoteTracker::new(&bank_forks.root_bank());
vote_tracker.insert_vote(10, vote_pubkey.clone());
vote_tracker.insert_vote(10, vote_pubkey);
ReplayStage::update_propagation_status(
&mut progress_map,
10,
@ -3319,7 +3310,7 @@ pub(crate) mod tests {
// The voter should be recorded
assert!(propagated_stats
.propagated_validators
.contains(&*vote_pubkey));
.contains(&vote_pubkey));
assert_eq!(propagated_stats.propagated_validators_stake, stake);
}
@ -3378,7 +3369,7 @@ pub(crate) mod tests {
let vote_tracker = VoteTracker::new(&bank_forks.root_bank());
for vote_pubkey in &vote_pubkeys {
// Insert a vote for the last bank for each voter
vote_tracker.insert_vote(10, Arc::new(*vote_pubkey));
vote_tracker.insert_vote(10, *vote_pubkey);
}
// The last bank should reach propagation threshold, and propagate it all
@ -3464,7 +3455,7 @@ pub(crate) mod tests {
let vote_tracker = VoteTracker::new(&bank_forks.root_bank());
// Insert a new vote
vote_tracker.insert_vote(10, Arc::new(vote_pubkeys[2]));
vote_tracker.insert_vote(10, vote_pubkeys[2]);
// The last bank should reach propagation threshold, and propagate it all
// the way back through earlier leader banks
@ -3827,7 +3818,7 @@ pub(crate) mod tests {
// Add votes
for vote_key in validator_voting_keys.values() {
vote_tracker.insert_vote(root_bank.slot(), Arc::new(*vote_key));
vote_tracker.insert_vote(root_bank.slot(), *vote_key);
}
assert!(!progress.is_propagated(root_bank.slot()));

View File

@ -1,10 +1,10 @@
use solana_runtime::commitment::VOTE_THRESHOLD_SIZE;
use solana_sdk::pubkey::Pubkey;
use std::{collections::HashSet, sync::Arc};
use std::collections::HashSet;
#[derive(Default)]
pub struct VoteStakeTracker {
voted: HashSet<Arc<Pubkey>>,
voted: HashSet<Pubkey>,
stake: u64,
}
@ -15,7 +15,7 @@ impl VoteStakeTracker {
// `is_new` is true if the vote has not been seen before
pub fn add_vote_pubkey(
&mut self,
vote_pubkey: Arc<Pubkey>,
vote_pubkey: Pubkey,
stake: u64,
total_stake: u64,
) -> (bool, bool) {
@ -35,7 +35,7 @@ impl VoteStakeTracker {
}
}
pub fn voted(&self) -> &HashSet<Arc<Pubkey>> {
pub fn voted(&self) -> &HashSet<Pubkey> {
&self.voted
}
@ -53,12 +53,12 @@ mod test {
let total_epoch_stake = 10;
let mut vote_stake_tracker = VoteStakeTracker::default();
for i in 0..10 {
let pubkey = Arc::new(solana_sdk::pubkey::new_rand());
let pubkey = solana_sdk::pubkey::new_rand();
let (is_confirmed, is_new) =
vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake);
vote_stake_tracker.add_vote_pubkey(pubkey, 1, total_epoch_stake);
let stake = vote_stake_tracker.stake();
let (is_confirmed2, is_new2) =
vote_stake_tracker.add_vote_pubkey(pubkey.clone(), 1, total_epoch_stake);
vote_stake_tracker.add_vote_pubkey(pubkey, 1, total_epoch_stake);
let stake2 = vote_stake_tracker.stake();
// Stake should not change from adding same pubkey twice