Add an RPC API that can query the list of Top N secondary index keys and their sizes (#28887)
Co-authored-by: K-anon <IntokuSatori@users.noreply.github.com>
This commit is contained in:
parent
b345d97f67
commit
1e3d6349aa
|
@ -2505,6 +2505,14 @@ impl AccountsDb {
|
|||
AccountsDb::new_for_tests_with_caching(Vec::new(), &ClusterType::Development)
|
||||
}
|
||||
|
||||
pub fn new_single_for_tests_with_secondary_indexes(
|
||||
secondary_indexes: AccountSecondaryIndexes,
|
||||
) -> Self {
|
||||
let mut accounts_db = AccountsDb::new_single_for_tests();
|
||||
accounts_db.account_indexes = secondary_indexes;
|
||||
accounts_db
|
||||
}
|
||||
|
||||
fn next_id(&self) -> AppendVecId {
|
||||
let next_id = self.next_id.fetch_add(1, Ordering::AcqRel);
|
||||
assert!(next_id != AppendVecId::MAX, "We've run out of storage ids!");
|
||||
|
@ -9228,23 +9236,27 @@ pub mod tests {
|
|||
use {
|
||||
super::*,
|
||||
crate::{
|
||||
accounts::Accounts,
|
||||
accounts_hash::MERKLE_FANOUT,
|
||||
accounts_index::{
|
||||
tests::*, AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount,
|
||||
tests::*, AccountIndex, AccountSecondaryIndexes,
|
||||
AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount,
|
||||
},
|
||||
append_vec::{test_utils::TempFile, AccountMeta, StoredMeta},
|
||||
cache_hash_data_stats::CacheHashDataStats,
|
||||
inline_spl_token,
|
||||
secondary_index::MAX_NUM_LARGEST_INDEX_KEYS_RETURNED,
|
||||
},
|
||||
assert_matches::assert_matches,
|
||||
itertools::Itertools,
|
||||
rand::{prelude::SliceRandom, thread_rng, Rng},
|
||||
rand::{distributions::Uniform, prelude::SliceRandom, thread_rng, Rng},
|
||||
solana_sdk::{
|
||||
account::{
|
||||
accounts_equal, Account, AccountSharedData, ReadableAccount, WritableAccount,
|
||||
},
|
||||
hash::HASH_BYTES,
|
||||
pubkey::PUBKEY_BYTES,
|
||||
system_program,
|
||||
},
|
||||
std::{
|
||||
iter::FromIterator,
|
||||
|
@ -17694,4 +17706,215 @@ pub mod tests {
|
|||
let hashes = hashes.into_iter().collect();
|
||||
AccountsHasher::compute_merkle_root_recurse(hashes, MERKLE_FANOUT)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_largest_keys() {
|
||||
solana_logger::setup();
|
||||
// Constants
|
||||
const NUM_DUMMY_ACCOUNTS: usize = 50;
|
||||
const MAX_CHILD_ACCOUNTS: usize = 100;
|
||||
let mut slot = 0;
|
||||
|
||||
// Set secondary indexes
|
||||
let account_indexes = AccountSecondaryIndexes {
|
||||
keys: None,
|
||||
indexes: HashSet::from([AccountIndex::ProgramId]),
|
||||
};
|
||||
|
||||
// AccountDB Setup
|
||||
let accounts_db = AccountsDb::new_single_for_tests_with_secondary_indexes(account_indexes);
|
||||
|
||||
// Assert that list is empty. No accounts added yet.
|
||||
let mut test_largest_keys = accounts_db.accounts_index.get_largest_keys(
|
||||
&AccountIndex::ProgramId,
|
||||
MAX_NUM_LARGEST_INDEX_KEYS_RETURNED,
|
||||
);
|
||||
assert_eq!(0, test_largest_keys.len());
|
||||
|
||||
// Add some basic system owned accounts
|
||||
let mut dummy_account_pubkeys = Vec::with_capacity(NUM_DUMMY_ACCOUNTS);
|
||||
let mut num_generator = thread_rng();
|
||||
let key_size_range = Uniform::new_inclusive(0, MAX_CHILD_ACCOUNTS);
|
||||
for i in 1..=NUM_DUMMY_ACCOUNTS {
|
||||
let pubkey = Pubkey::new_unique();
|
||||
dummy_account_pubkeys.push(pubkey);
|
||||
let account = AccountSharedData::from(Account {
|
||||
lamports: 11111111,
|
||||
owner: system_program::id(),
|
||||
..Account::default()
|
||||
});
|
||||
// Store account in the AccountsDB
|
||||
accounts_db.store_for_tests(slot, &[(&dummy_account_pubkeys[i - 1], &account)]);
|
||||
slot += 1;
|
||||
// Check that the system pubkey increments each time
|
||||
test_largest_keys = accounts_db.accounts_index.get_largest_keys(
|
||||
&AccountIndex::ProgramId,
|
||||
MAX_NUM_LARGEST_INDEX_KEYS_RETURNED,
|
||||
);
|
||||
assert_eq!(test_largest_keys.len(), 1);
|
||||
let number_system_owned_accounts = test_largest_keys[0].0;
|
||||
assert_eq!(i, number_system_owned_accounts);
|
||||
}
|
||||
|
||||
// Now add a random number of accounts each owned by one of the newly
|
||||
// created dummy pubkeys
|
||||
for dummy_account in &dummy_account_pubkeys {
|
||||
// Add child accounts to each dummy account
|
||||
let num_children = (&mut num_generator).sample_iter(key_size_range).next();
|
||||
for j in 0..num_children.unwrap_or(0) {
|
||||
let child_pubkey = Pubkey::new_unique();
|
||||
let child_account = AccountSharedData::from(Account {
|
||||
lamports: ((j as u64) + 1) * 1000,
|
||||
owner: *dummy_account,
|
||||
..Account::default()
|
||||
});
|
||||
accounts_db.store_for_tests(slot, &[(&child_pubkey, &child_account)]);
|
||||
slot += 1;
|
||||
}
|
||||
// Check for entries with the same key size for sub sorting by pubkey
|
||||
let existing_key_size_position = test_largest_keys
|
||||
.iter()
|
||||
.position(|(x, _)| *x == num_children.unwrap_or(0));
|
||||
// Find where it should go and insert it
|
||||
let key_position = match test_largest_keys
|
||||
.binary_search_by_key(&num_children.unwrap_or(0), |(size, _)| *size)
|
||||
{
|
||||
Ok(found_position) => found_position,
|
||||
Err(woudbe_position) => woudbe_position,
|
||||
};
|
||||
test_largest_keys.insert(key_position, (num_children.unwrap_or(0), *dummy_account));
|
||||
// If there were indeed more elements with the same key size sort them by Pubkey
|
||||
if existing_key_size_position.is_some() {
|
||||
// Obtain a slice of mutable references to all elements with the same key_size
|
||||
let mut sub_slice = test_largest_keys
|
||||
.split_mut(|(k, _)| *k != num_children.unwrap_or(0))
|
||||
.flatten()
|
||||
.collect_vec();
|
||||
// Sort them...
|
||||
let mut sorting_buffer = sub_slice.iter().map(|x| *(*x)).collect_vec();
|
||||
sorting_buffer.sort_unstable_by_key(|(_, v)| *v);
|
||||
// Copy back into the list
|
||||
for i in 0..sub_slice.len() {
|
||||
*(sub_slice[i]) = (sorting_buffer[i].0, sorting_buffer[i].1);
|
||||
}
|
||||
}
|
||||
// Prune list
|
||||
while test_largest_keys.len() > MAX_NUM_LARGEST_INDEX_KEYS_RETURNED {
|
||||
test_largest_keys.remove(0);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify secondary index list matches expected list built above.
|
||||
let largest_keys = accounts_db.accounts_index.get_largest_keys(
|
||||
&AccountIndex::ProgramId,
|
||||
MAX_NUM_LARGEST_INDEX_KEYS_RETURNED,
|
||||
);
|
||||
// Reverse the tracking vector and check for equality
|
||||
// Note: Backend stores the `key_size_index` in ascending key size, but
|
||||
// `get_largest_keys` returns the data in descending order, ie. the largest at the top.
|
||||
test_largest_keys = test_largest_keys.into_iter().rev().collect_vec();
|
||||
assert_eq!(test_largest_keys, largest_keys);
|
||||
|
||||
// Test queries for a partial list and past max return size
|
||||
let mut largest_program_id_keys = Vec::<(usize, Pubkey)>::new();
|
||||
for i in 0..=MAX_NUM_LARGEST_INDEX_KEYS_RETURNED + 1 {
|
||||
largest_program_id_keys = accounts_db
|
||||
.accounts_index
|
||||
.get_largest_keys(&AccountIndex::ProgramId, i);
|
||||
if i <= MAX_NUM_LARGEST_INDEX_KEYS_RETURNED {
|
||||
assert_eq!(largest_program_id_keys.len(), i);
|
||||
} else {
|
||||
assert_eq!(
|
||||
largest_program_id_keys.len(),
|
||||
MAX_NUM_LARGEST_INDEX_KEYS_RETURNED
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Root the bank preparing for removal
|
||||
(0..slot).for_each(|slot| {
|
||||
accounts_db.calculate_accounts_delta_hash(slot);
|
||||
accounts_db.add_root_and_flush_write_cache(slot);
|
||||
});
|
||||
|
||||
// Test Removal of Keys
|
||||
// First just remove a single key
|
||||
let mut smallest_key: Pubkey;
|
||||
let mut smallest_key_size: usize;
|
||||
let mut smallest_key_inner_keys: Vec<Pubkey>;
|
||||
let mut try_again = 0;
|
||||
let zero_lamport_account =
|
||||
AccountSharedData::new(0, 0, AccountSharedData::default().owner());
|
||||
loop {
|
||||
smallest_key = largest_program_id_keys[largest_program_id_keys.len() - 1 - try_again].1;
|
||||
smallest_key_size =
|
||||
largest_program_id_keys[largest_program_id_keys.len() - 1 - try_again].0;
|
||||
let mut collector = Vec::new();
|
||||
accounts_db
|
||||
.scan_accounts(
|
||||
&Ancestors::default(),
|
||||
0,
|
||||
|some_account_tuple| {
|
||||
if let Some(mapped_account_tuple) = some_account_tuple
|
||||
.filter(|(_, account, _)| {
|
||||
Accounts::is_loadable(account.lamports())
|
||||
&& account.owner() == &smallest_key
|
||||
})
|
||||
.map(|(pubkey, account, _slot)| (*pubkey, account))
|
||||
{
|
||||
collector.push(mapped_account_tuple)
|
||||
}
|
||||
},
|
||||
&ScanConfig::new(true),
|
||||
)
|
||||
.ok();
|
||||
smallest_key_inner_keys = collector.into_iter().map(|(k, _)| k).collect_vec();
|
||||
let single_inner_key = smallest_key_inner_keys.pop().unwrap();
|
||||
// Overwrite the account as a 0 lamport account and clean.
|
||||
accounts_db.store_for_tests(slot, &[(&single_inner_key, &zero_lamport_account)]);
|
||||
accounts_db.calculate_accounts_delta_hash(slot);
|
||||
accounts_db.add_root_and_flush_write_cache(slot);
|
||||
slot += 1;
|
||||
accounts_db.clean_accounts_for_tests();
|
||||
// Read back
|
||||
largest_program_id_keys = accounts_db.accounts_index.get_largest_keys(
|
||||
&AccountIndex::ProgramId,
|
||||
MAX_NUM_LARGEST_INDEX_KEYS_RETURNED,
|
||||
);
|
||||
// Ensure the below check is comparing the same pubkey in case there were ties in the list for keysize.
|
||||
if largest_program_id_keys[largest_program_id_keys.len() - 1 - try_again].1
|
||||
== smallest_key
|
||||
{
|
||||
break;
|
||||
}
|
||||
// If there were a duplicate keysize, just move up in the largest key list.
|
||||
// worst case use the second largest key and keep removing till the tie is broken.
|
||||
else if try_again < largest_program_id_keys.len() - 2 {
|
||||
try_again += 1;
|
||||
}
|
||||
}
|
||||
// Make sure outer key size decreased
|
||||
assert_eq!(
|
||||
smallest_key_size - 1,
|
||||
largest_program_id_keys[largest_program_id_keys.len() - 1 - try_again].0
|
||||
);
|
||||
|
||||
// Test removal of multiple keys
|
||||
for key in smallest_key_inner_keys {
|
||||
accounts_db.store_for_tests(slot, &[(&key, &zero_lamport_account)]);
|
||||
}
|
||||
accounts_db.calculate_accounts_delta_hash(slot);
|
||||
accounts_db.add_root_and_flush_write_cache(slot);
|
||||
accounts_db.clean_accounts_for_tests();
|
||||
// Read back
|
||||
largest_program_id_keys = accounts_db.accounts_index.get_largest_keys(
|
||||
&AccountIndex::ProgramId,
|
||||
MAX_NUM_LARGEST_INDEX_KEYS_RETURNED,
|
||||
);
|
||||
// Since all inner keys were removed, make sure outer key is gone too.
|
||||
let outer_key_removed = !largest_program_id_keys
|
||||
.iter()
|
||||
.any(|(_, v)| *v == smallest_key);
|
||||
assert!(outer_key_removed);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1487,6 +1487,27 @@ impl<T: IndexValue> AccountsIndex<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_largest_keys(
|
||||
&self,
|
||||
index: &AccountIndex,
|
||||
max_entries: usize,
|
||||
) -> Vec<(usize, Pubkey)> {
|
||||
match index {
|
||||
AccountIndex::ProgramId => self
|
||||
.program_id_index
|
||||
.key_size_index
|
||||
.get_largest_keys(max_entries),
|
||||
AccountIndex::SplTokenOwner => self
|
||||
.spl_token_owner_index
|
||||
.key_size_index
|
||||
.get_largest_keys(max_entries),
|
||||
AccountIndex::SplTokenMint => self
|
||||
.spl_token_mint_index
|
||||
.key_size_index
|
||||
.get_largest_keys(max_entries),
|
||||
}
|
||||
}
|
||||
|
||||
/// log any secondary index counts, if non-zero
|
||||
pub(crate) fn log_secondary_indexes(&self) {
|
||||
if !self.program_id_index.index.is_empty() {
|
||||
|
|
|
@ -12,6 +12,9 @@ use {
|
|||
},
|
||||
};
|
||||
|
||||
pub const MAX_NUM_LARGEST_INDEX_KEYS_RETURNED: usize = 20;
|
||||
pub const NUM_LARGEST_INDEX_KEYS_CACHED: usize = 200;
|
||||
|
||||
// The only cases where an inner key should map to a different outer key is
|
||||
// if the key had different account data for the indexed key across different
|
||||
// slots. As this is rare, it should be ok to use a Vec here over a HashSet, even
|
||||
|
@ -99,12 +102,112 @@ impl SecondaryIndexEntry for RwLockSecondaryIndexEntry {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct HierarchicalOrderedMap<K, V>
|
||||
where
|
||||
K: Default + PartialEq + Ord + Clone,
|
||||
V: Default + PartialEq + Ord + Clone,
|
||||
{
|
||||
capacity: usize,
|
||||
map: Vec<(K, V)>,
|
||||
}
|
||||
|
||||
impl<K, V> HierarchicalOrderedMap<K, V>
|
||||
where
|
||||
K: Default + PartialEq + Ord + Clone,
|
||||
V: Default + PartialEq + Ord + Clone,
|
||||
{
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
map: Vec::new(),
|
||||
}
|
||||
}
|
||||
fn get_map(&self) -> &Vec<(K, V)> {
|
||||
&self.map
|
||||
}
|
||||
fn sort_slice_by_value(&mut self, slice_key: &K) {
|
||||
// Obtain a slice of mutable references to all elements with the same key
|
||||
for sub_slice in self.map.split_mut(|(k, _)| k != slice_key) {
|
||||
// Sort them
|
||||
if !sub_slice.is_empty() {
|
||||
sub_slice.sort_unstable_by_key(|(_, v)| v.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
fn update_map(&mut self, key: &K, value: &V) {
|
||||
// Check if the value already exists.
|
||||
let existing_value_position = self.map.iter().position(|(_, y)| y == value);
|
||||
// Remove it if it does.
|
||||
// Note: Removal maintains sorted order, updating would require a re-sort.
|
||||
// Thus, since we have to search to find the new position anyways,
|
||||
// just throw it away and re-insert as if its a new element.
|
||||
if let Some(position) = existing_value_position {
|
||||
self.map.remove(position);
|
||||
}
|
||||
// If its a new value...
|
||||
else {
|
||||
// Check if the list is full, and if the key is less than the smallest element, if so exit early.
|
||||
if self.map.len() >= self.capacity && self.map[0].0 > *key {
|
||||
return;
|
||||
}
|
||||
};
|
||||
// Find where the new entry goes and insert it.
|
||||
// Also report if there are more elements in the list with the same key => they need sorting.
|
||||
let (key_position, needs_sort) =
|
||||
match self.map.binary_search_by_key(key, |(k, _)| k.clone()) {
|
||||
Ok(found_position) => (found_position, true),
|
||||
Err(woudbe_position) => (woudbe_position, false),
|
||||
};
|
||||
self.map.insert(key_position, (key.clone(), value.clone()));
|
||||
// If there were indeed more elements with the same key sort them by value
|
||||
if needs_sort {
|
||||
self.sort_slice_by_value(key);
|
||||
}
|
||||
// Prune list if too big
|
||||
while self.map.len() > self.capacity {
|
||||
self.map.remove(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SecondaryIndexLargestKeys(RwLock<HierarchicalOrderedMap<usize, Pubkey>>);
|
||||
impl Default for SecondaryIndexLargestKeys {
|
||||
fn default() -> Self {
|
||||
let container = HierarchicalOrderedMap::<usize, Pubkey>::new(NUM_LARGEST_INDEX_KEYS_CACHED);
|
||||
SecondaryIndexLargestKeys(RwLock::new(container))
|
||||
}
|
||||
}
|
||||
impl SecondaryIndexLargestKeys {
|
||||
pub fn get_largest_keys(&self, max_entries: usize) -> Vec<(usize, Pubkey)> {
|
||||
// Obtain the shared resource.
|
||||
let largest_key_list = self.0.read().unwrap();
|
||||
// Collect elements into a vector.
|
||||
let num_entries = std::cmp::min(MAX_NUM_LARGEST_INDEX_KEYS_RETURNED, max_entries);
|
||||
largest_key_list
|
||||
.get_map()
|
||||
.iter()
|
||||
.rev()
|
||||
.take(num_entries)
|
||||
.copied()
|
||||
.collect::<Vec<(usize, Pubkey)>>()
|
||||
}
|
||||
pub fn update(&self, key_size: &usize, pubkey: &Pubkey) {
|
||||
// Obtain the shared resource.
|
||||
let mut largest_key_list = self.0.write().unwrap();
|
||||
// Update the list
|
||||
largest_key_list.update_map(key_size, pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct SecondaryIndex<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send> {
|
||||
metrics_name: &'static str,
|
||||
// Map from index keys to index values
|
||||
pub index: DashMap<Pubkey, SecondaryIndexEntryType>,
|
||||
pub reverse_index: DashMap<Pubkey, SecondaryReverseIndexEntry>,
|
||||
pub key_size_index: SecondaryIndexLargestKeys,
|
||||
stats: SecondaryIndexStats,
|
||||
}
|
||||
|
||||
|
@ -125,7 +228,11 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
|
|||
.get(key)
|
||||
.unwrap_or_else(|| self.index.entry(*key).or_default().downgrade());
|
||||
|
||||
let key_size_cache = pubkeys_map.len();
|
||||
pubkeys_map.insert_if_not_exists(inner_key, &self.stats.num_inner_keys);
|
||||
if key_size_cache != pubkeys_map.len() {
|
||||
self.key_size_index.update(&pubkeys_map.len(), key);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -173,6 +280,7 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
|
|||
// If we deleted a pubkey from the reverse_index, then the corresponding entry
|
||||
// better exist in this index as well or the two indexes are out of sync!
|
||||
assert!(inner_key_map.value().remove_inner_key(removed_inner_key));
|
||||
self.key_size_index.update(&inner_key_map.len(), outer_key);
|
||||
inner_key_map.is_empty()
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue