More generic accounts purge functions (#14595)

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin 2021-01-17 20:31:03 -08:00 committed by GitHub
parent 8d4ab1bab1
commit 5f14f45282
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 73 deletions

View File

@ -5,7 +5,6 @@ use crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots,
repair_weight::RepairWeight,
repair_weighted_traversal::Contains,
result::Result,
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
};
@ -15,7 +14,9 @@ use solana_ledger::{
shred::Nonce,
};
use solana_measure::measure::Measure;
use solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE};
use solana_runtime::{
bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE, contains::Contains,
};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use std::{
collections::{HashMap, HashSet},
@ -402,12 +403,12 @@ impl RepairService {
}
/// Repairs any fork starting at the input slot
pub fn generate_repairs_for_fork(
pub fn generate_repairs_for_fork<'a>(
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &dyn Contains<Slot>,
duplicate_slot_repair_statuses: &impl Contains<'a, Slot>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {

View File

@ -1,13 +1,10 @@
use crate::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
repair_service::RepairTiming,
repair_weighted_traversal::{self, Contains},
serve_repair::RepairType,
tree_diff::TreeDiff,
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairTiming,
repair_weighted_traversal, serve_repair::RepairType, tree_diff::TreeDiff,
};
use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore};
use solana_measure::measure::Measure;
use solana_runtime::epoch_stakes::EpochStakes;
use solana_runtime::{contains::Contains, epoch_stakes::EpochStakes};
use solana_sdk::{
clock::Slot,
epoch_schedule::{Epoch, EpochSchedule},
@ -129,14 +126,14 @@ impl RepairWeight {
}
}
pub fn get_best_weighted_repairs(
pub fn get_best_weighted_repairs<'a>(
&mut self,
blockstore: &Blockstore,
epoch_stakes: &HashMap<Epoch, EpochStakes>,
epoch_schedule: &EpochSchedule,
max_new_orphans: usize,
max_new_shreds: usize,
ignore_slots: &dyn Contains<Slot>,
ignore_slots: &impl Contains<'a, Slot>,
repair_timing: Option<&mut RepairTiming>,
) -> Vec<RepairType> {
let mut repairs = vec![];
@ -228,12 +225,12 @@ impl RepairWeight {
}
// Generate shred repairs for main subtree rooted at `self.slot`
fn get_best_shreds(
fn get_best_shreds<'a>(
&mut self,
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
max_new_shreds: usize,
ignore_slots: &dyn Contains<Slot>,
ignore_slots: &impl Contains<'a, Slot>,
) {
let root_tree = self.trees.get(&self.root).expect("Root tree must exist");
repair_weighted_traversal::get_best_repair_shreds(

View File

@ -3,27 +3,9 @@ use crate::{
serve_repair::RepairType, tree_diff::TreeDiff,
};
use solana_ledger::blockstore::Blockstore;
use solana_runtime::contains::Contains;
use solana_sdk::clock::Slot;
use std::{
cmp::Eq,
collections::{HashMap, HashSet},
hash::Hash,
};
pub trait Contains<T: Eq + Hash> {
fn contains(&self, key: &T) -> bool;
}
impl<T: Eq + Hash, U> Contains<T> for HashMap<T, U> {
fn contains(&self, key: &T) -> bool {
self.contains_key(key)
}
}
impl<T: Eq + Hash> Contains<T> for HashSet<T> {
fn contains(&self, key: &T) -> bool {
self.contains(key)
}
}
use std::collections::{HashMap, HashSet};
#[derive(Debug, PartialEq)]
enum Visit {
@ -84,12 +66,12 @@ impl<'a> Iterator for RepairWeightTraversal<'a> {
}
// Generate shred repairs for main subtree rooted at `self.slot`
pub fn get_best_repair_shreds(
pub fn get_best_repair_shreds<'a>(
tree: &HeaviestSubtreeForkChoice,
blockstore: &Blockstore,
repairs: &mut Vec<RepairType>,
max_new_shreds: usize,
ignore_slots: &dyn Contains<Slot>,
ignore_slots: &impl Contains<'a, Slot>,
) {
let initial_len = repairs.len();
let max_repairs = initial_len + max_new_shreds;

View File

@ -24,6 +24,7 @@ use crate::{
AccountIndex, AccountsIndex, Ancestors, IndexKey, IsCached, SlotList, SlotSlice,
},
append_vec::{AppendVec, StoredAccountMeta, StoredMeta},
contains::Contains,
};
use blake3::traits::digest::Digest;
use dashmap::DashMap;
@ -893,17 +894,20 @@ impl AccountsDB {
}
}
fn purge_keys_exact(
&self,
pubkey_to_slot_set: Vec<(Pubkey, HashSet<Slot>)>,
) -> Vec<(u64, AccountInfo)> {
fn purge_keys_exact<'a, C: 'a>(
&'a self,
pubkey_to_slot_set: &'a [(Pubkey, C)],
) -> Vec<(u64, AccountInfo)>
where
C: Contains<'a, Slot>,
{
let mut reclaims = Vec::new();
let mut dead_keys = Vec::new();
for (pubkey, slots_set) in pubkey_to_slot_set {
let is_empty = self.accounts_index.purge_exact(
&pubkey,
&slots_set,
slots_set,
&mut reclaims,
&self.account_indexes,
);
@ -1079,11 +1083,18 @@ impl AccountsDB {
let pubkey_to_slot_set: Vec<_> = purges
.into_iter()
.map(|(key, (slots_list, _ref_count))| {
(key, slots_list.into_iter().map(|(slot, _)| slot).collect())
(
key,
slots_list
.into_iter()
.map(|(slot, _)| slot)
.collect::<HashSet<Slot>>(),
)
})
.collect();
let reclaims = self.purge_keys_exact(pubkey_to_slot_set);
let reclaims = self.purge_keys_exact(&pubkey_to_slot_set);
self.handle_reclaims(&reclaims, None, false, None);
reclaims_time.stop();
@ -2388,19 +2399,18 @@ impl AccountsDB {
fn purge_slot_cache_keys(&self, dead_slot: Slot, slot_cache: SlotCache) {
// Slot purged from cache should not exist in the backing store
assert!(self.storage.get_slot_stores(dead_slot).is_none());
let dead_slots: HashSet<Slot> = vec![dead_slot].into_iter().collect();
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let pubkey_to_slot_set: Vec<(Pubkey, HashSet<Slot>)> = slot_cache
let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache
.iter()
.map(|account| {
purged_slot_pubkeys.insert((dead_slot, *account.key()));
(*account.key(), dead_slots.clone())
(*account.key(), dead_slot)
})
.collect();
let num_purged_keys = pubkey_to_slot_set.len();
let reclaims = self.purge_keys_exact(pubkey_to_slot_set);
let reclaims = self.purge_keys_exact(&pubkey_to_slot_set);
assert_eq!(reclaims.len(), num_purged_keys);
self.finalize_dead_slot_removal(&dead_slots, purged_slot_pubkeys, None);
self.finalize_dead_slot_removal(std::iter::once(&dead_slot), purged_slot_pubkeys, None);
}
fn purge_slots(&self, slots: &HashSet<Slot>) {
@ -3478,9 +3488,9 @@ impl AccountsDB {
dead_slots
}
fn finalize_dead_slot_removal(
&self,
dead_slots: &HashSet<Slot>,
fn finalize_dead_slot_removal<'a>(
&'a self,
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
mut purged_account_slots: Option<&mut AccountSlots>,
) {
@ -3491,12 +3501,12 @@ impl AccountsDB {
self.accounts_index.unref_from_storage(&pubkey);
}
for slot in dead_slots.iter() {
for slot in dead_slots_iter.clone() {
self.accounts_index.clean_dead_slot(*slot);
}
{
let mut bank_hashes = self.bank_hashes.write().unwrap();
for slot in dead_slots.iter() {
for slot in dead_slots_iter {
bank_hashes.remove(slot);
}
}
@ -3533,7 +3543,11 @@ impl AccountsDB {
})
})
};
self.finalize_dead_slot_removal(dead_slots, purged_slot_pubkeys, purged_account_slots);
self.finalize_dead_slot_removal(
dead_slots.iter(),
purged_slot_pubkeys,
purged_account_slots,
);
measure.stop();
inc_new_counter_info!("clean_stored_dead_slots-ms", measure.as_ms() as usize);
}
@ -5492,7 +5506,7 @@ pub mod tests {
let slots: HashSet<Slot> = vec![1].into_iter().collect();
let purge_keys = vec![(key1, slots)];
db.purge_keys_exact(purge_keys);
db.purge_keys_exact(&purge_keys);
let account2 = Account::new(3, 0, &key);
db.store_uncached(2, &[(&key1, &account2)]);

View File

@ -1,4 +1,5 @@
use crate::{
contains::Contains,
inline_spl_token_v2_0::{self, SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET},
secondary_index::*,
};
@ -529,11 +530,11 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
(w_account_entry.unwrap(), is_newly_inserted)
}
pub fn handle_dead_keys(&self, dead_keys: &[Pubkey], account_indexes: &HashSet<AccountIndex>) {
pub fn handle_dead_keys(&self, dead_keys: &[&Pubkey], account_indexes: &HashSet<AccountIndex>) {
if !dead_keys.is_empty() {
for key in dead_keys.iter() {
let mut w_index = self.account_maps.write().unwrap();
if let btree_map::Entry::Occupied(index_entry) = w_index.entry(*key) {
if let btree_map::Entry::Occupied(index_entry) = w_index.entry(**key) {
if index_entry.get().slot_list.read().unwrap().is_empty() {
index_entry.remove();
@ -541,7 +542,11 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
// is only safe because we have the lock for this key's entry
// in the AccountsIndex, so no other thread is also updating
// the index
self.purge_secondary_indexes_by_inner_key(key, None, account_indexes);
self.purge_secondary_indexes_by_inner_key(
key,
None::<&Slot>,
account_indexes,
);
}
}
}
@ -605,13 +610,16 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
)
}
pub fn purge_exact(
&self,
pub fn purge_exact<'a, C>(
&'a self,
pubkey: &Pubkey,
slots_to_purge: &HashSet<Slot>,
slots_to_purge: &'a C,
reclaims: &mut SlotList<T>,
account_indexes: &HashSet<AccountIndex>,
) -> bool {
) -> bool
where
C: Contains<'a, Slot>,
{
let res = {
let mut write_account_map_entry = self.get_account_write_entry(pubkey).unwrap();
write_account_map_entry.slot_list_mut(|slot_list| {
@ -625,7 +633,7 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
slot_list.is_empty()
})
};
self.purge_secondary_indexes_by_inner_key(pubkey, Some(&slots_to_purge), account_indexes);
self.purge_secondary_indexes_by_inner_key(pubkey, Some(slots_to_purge), account_indexes);
res
}
@ -803,12 +811,14 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
}
}
fn purge_secondary_indexes_by_inner_key(
&self,
fn purge_secondary_indexes_by_inner_key<'a, C>(
&'a self,
inner_key: &Pubkey,
slots_to_remove: Option<&HashSet<Slot>>,
slots_to_remove: Option<&'a C>,
account_indexes: &HashSet<AccountIndex>,
) {
) where
C: Contains<'a, Slot>,
{
if account_indexes.contains(&AccountIndex::ProgramId) {
self.program_id_index
.remove_by_inner_key(inner_key, slots_to_remove);
@ -1697,7 +1707,7 @@ pub mod tests {
index.purge_exact(
&account_key,
&slots.into_iter().collect(),
&slots.into_iter().collect::<HashSet<Slot>>(),
&mut vec![],
account_index,
);
@ -1933,7 +1943,7 @@ pub mod tests {
.slot_list_mut(|slot_list| slot_list.clear());
// Everything should be deleted
index.handle_dead_keys(&[account_key], account_index);
index.handle_dead_keys(&[&account_key], account_index);
assert!(index.spl_token_mint_index.index.is_empty());
assert!(index.spl_token_mint_index.reverse_index.is_empty());
}

49
runtime/src/contains.rs Normal file
View File

@ -0,0 +1,49 @@
use std::{
borrow::Borrow,
cmp::Eq,
collections::{HashMap, HashSet},
hash::Hash,
};
pub trait Contains<'a, T: Eq + Hash> {
type Item: Borrow<T>;
type Iter: Iterator<Item = Self::Item>;
fn contains(&self, key: &T) -> bool;
fn contains_iter(&'a self) -> Self::Iter;
}
impl<'a, T: 'a + Eq + Hash, U: 'a> Contains<'a, T> for HashMap<T, U> {
type Item = &'a T;
type Iter = std::collections::hash_map::Keys<'a, T, U>;
fn contains(&self, key: &T) -> bool {
<HashMap<T, U>>::contains_key(self, key)
}
fn contains_iter(&'a self) -> Self::Iter {
self.keys()
}
}
impl<'a, T: 'a + Eq + Hash> Contains<'a, T> for HashSet<T> {
type Item = &'a T;
type Iter = std::collections::hash_set::Iter<'a, T>;
fn contains(&self, key: &T) -> bool {
<HashSet<T>>::contains(self, key)
}
fn contains_iter(&'a self) -> Self::Iter {
self.iter()
}
}
impl<'a, T: 'a + Eq + Hash + Copy> Contains<'a, T> for T {
type Item = &'a T;
type Iter = std::iter::Once<&'a T>;
fn contains(&self, key: &T) -> bool {
key == self
}
fn contains_iter(&'a self) -> Self::Iter {
std::iter::once(self)
}
}

View File

@ -13,6 +13,7 @@ mod blockhash_queue;
pub mod bloom;
pub mod builtins;
pub mod commitment;
pub mod contains;
pub mod epoch_stakes;
pub mod genesis_utils;
pub mod hardened_unpack;

View File

@ -1,7 +1,9 @@
use crate::contains::Contains;
use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use log::*;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
borrow::Borrow,
collections::{hash_map, HashMap, HashSet},
fmt::Debug,
sync::{Arc, RwLock},
@ -257,7 +259,10 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
// Note passing `None` is dangerous unless you're sure there's no other competing threads
// writing updates to the index for this Pubkey at the same time!
pub fn remove_by_inner_key(&self, inner_key: &Pubkey, slots_to_remove: Option<&HashSet<Slot>>) {
pub fn remove_by_inner_key<'a, C>(&'a self, inner_key: &Pubkey, slots_to_remove: Option<&'a C>)
where
C: Contains<'a, Slot>,
{
// Save off which keys in `self.index` had slots removed so we can remove them
// after we purge the reverse index
let mut key_to_removed_slots: HashMap<Pubkey, Vec<Slot>> = HashMap::new();
@ -272,12 +277,12 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
// Ideally we use a concurrent map here as well to prevent clean
// from blocking writes, but memory usage of DashMap is high
let mut w_slots_map = slots_map.value().write().unwrap();
for slot in slots_to_remove.iter() {
if let Some(removed_key) = w_slots_map.remove(slot) {
for slot in slots_to_remove.contains_iter() {
if let Some(removed_key) = w_slots_map.remove(slot.borrow()) {
key_to_removed_slots
.entry(removed_key)
.or_default()
.push(*slot);
.push(*slot.borrow());
}
}
w_slots_map.is_empty()