Add secondary indexes (#14212)

This commit is contained in:
carllin 2020-12-31 18:06:03 -08:00 committed by GitHub
parent 4a3d217839
commit 5affd8aa72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1875 additions and 234 deletions

View File

@ -39,9 +39,11 @@ use solana_metrics::inc_new_counter_info;
use solana_perf::packet::PACKET_DATA_SIZE;
use solana_runtime::{
accounts::AccountAddressFilter,
accounts_index::{AccountIndex, IndexKey},
bank::Bank,
bank_forks::BankForks,
commitment::{BlockCommitmentArray, BlockCommitmentCache, CommitmentSlots},
inline_spl_token_v2_0::{SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET},
};
use solana_sdk::{
account::Account,
@ -109,6 +111,7 @@ pub struct JsonRpcConfig {
pub enable_bigtable_ledger_storage: bool,
pub enable_bigtable_ledger_upload: bool,
pub max_multiple_accounts: Option<usize>,
pub account_indexes: HashSet<AccountIndex>,
}
#[derive(Clone)]
@ -315,12 +318,19 @@ impl JsonRpcRequestProcessor {
let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary);
let data_slice_config = config.data_slice;
check_slice_and_encoding(&encoding, data_slice_config.is_some())?;
let keyed_accounts = get_filtered_program_accounts(&bank, program_id, filters);
let keyed_accounts = {
if let Some(owner) = get_spl_token_owner_filter(program_id, &filters) {
self.get_filtered_spl_token_accounts_by_owner(&bank, &owner, filters)
} else {
self.get_filtered_program_accounts(&bank, program_id, filters)
}
};
let result =
if program_id == &spl_token_id_v2_0() && encoding == UiAccountEncoding::JsonParsed {
get_parsed_token_accounts(bank, keyed_accounts).collect()
get_parsed_token_accounts(bank, keyed_accounts.into_iter()).collect()
} else {
keyed_accounts
.into_iter()
.map(|(pubkey, account)| RpcKeyedAccount {
pubkey: pubkey.to_string(),
account: UiAccount::encode(
@ -1155,29 +1165,20 @@ impl JsonRpcRequestProcessor {
"Invalid param: not a v2.0 Token mint".to_string(),
));
}
let filters = vec![
// Filter on Mint address
RpcFilterType::Memcmp(Memcmp {
offset: 0,
bytes: MemcmpEncodedBytes::Binary(mint.to_string()),
encoding: None,
}),
// Filter on Token Account state
RpcFilterType::DataSize(TokenAccount::get_packed_len() as u64),
];
let mut token_balances: Vec<RpcTokenAccountBalance> =
get_filtered_program_accounts(&bank, &mint_owner, filters)
.map(|(address, account)| {
let amount = TokenAccount::unpack(&account.data)
.map(|account| account.amount)
.unwrap_or(0);
let amount = token_amount_to_ui_amount(amount, decimals);
RpcTokenAccountBalance {
address: address.to_string(),
amount,
}
})
.collect();
let mut token_balances: Vec<RpcTokenAccountBalance> = self
.get_filtered_spl_token_accounts_by_mint(&bank, &mint, vec![])
.into_iter()
.map(|(address, account)| {
let amount = TokenAccount::unpack(&account.data)
.map(|account| account.amount)
.unwrap_or(0);
let amount = token_amount_to_ui_amount(amount, decimals);
RpcTokenAccountBalance {
address: address.to_string(),
amount,
}
})
.collect();
token_balances.sort_by(|a, b| {
a.amount
.amount
@ -1201,18 +1202,9 @@ impl JsonRpcRequestProcessor {
let encoding = config.encoding.unwrap_or(UiAccountEncoding::Binary);
let data_slice_config = config.data_slice;
check_slice_and_encoding(&encoding, data_slice_config.is_some())?;
let (token_program_id, mint) = get_token_program_id_and_mint(&bank, token_account_filter)?;
let (_, mint) = get_token_program_id_and_mint(&bank, token_account_filter)?;
let mut filters = vec![
// Filter on Owner address
RpcFilterType::Memcmp(Memcmp {
offset: 32,
bytes: MemcmpEncodedBytes::Binary(owner.to_string()),
encoding: None,
}),
// Filter on Token Account state
RpcFilterType::DataSize(TokenAccount::get_packed_len() as u64),
];
let mut filters = vec![];
if let Some(mint) = mint {
// Optional filter on Mint address
filters.push(RpcFilterType::Memcmp(Memcmp {
@ -1221,11 +1213,13 @@ impl JsonRpcRequestProcessor {
encoding: None,
}));
}
let keyed_accounts = get_filtered_program_accounts(&bank, &token_program_id, filters);
let keyed_accounts = self.get_filtered_spl_token_accounts_by_owner(&bank, owner, filters);
let accounts = if encoding == UiAccountEncoding::JsonParsed {
get_parsed_token_accounts(bank.clone(), keyed_accounts).collect()
get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect()
} else {
keyed_accounts
.into_iter()
.map(|(pubkey, account)| RpcKeyedAccount {
pubkey: pubkey.to_string(),
account: UiAccount::encode(
@ -1269,22 +1263,22 @@ impl JsonRpcRequestProcessor {
bytes: MemcmpEncodedBytes::Binary(delegate.to_string()),
encoding: None,
}),
// Filter on Token Account state
RpcFilterType::DataSize(TokenAccount::get_packed_len() as u64),
];
if let Some(mint) = mint {
// Optional filter on Mint address
filters.push(RpcFilterType::Memcmp(Memcmp {
offset: 0,
bytes: MemcmpEncodedBytes::Binary(mint.to_string()),
encoding: None,
}));
}
let keyed_accounts = get_filtered_program_accounts(&bank, &token_program_id, filters);
// Optional filter on Mint address, uses mint account index for scan
let keyed_accounts = if let Some(mint) = mint {
self.get_filtered_spl_token_accounts_by_mint(&bank, &mint, filters)
} else {
// Filter on Token Account state
filters.push(RpcFilterType::DataSize(
TokenAccount::get_packed_len() as u64
));
self.get_filtered_program_accounts(&bank, &token_program_id, filters)
};
let accounts = if encoding == UiAccountEncoding::JsonParsed {
get_parsed_token_accounts(bank.clone(), keyed_accounts).collect()
get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect()
} else {
keyed_accounts
.into_iter()
.map(|(pubkey, account)| RpcKeyedAccount {
pubkey: pubkey.to_string(),
account: UiAccount::encode(
@ -1299,6 +1293,111 @@ impl JsonRpcRequestProcessor {
};
Ok(new_response(&bank, accounts))
}
/// Use a set of filters to get an iterator of keyed program accounts from a bank
fn get_filtered_program_accounts(
&self,
bank: &Arc<Bank>,
program_id: &Pubkey,
filters: Vec<RpcFilterType>,
) -> Vec<(Pubkey, Account)> {
let filter_closure = |account: &Account| {
filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data.len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data),
})
};
if self
.config
.account_indexes
.contains(&AccountIndex::ProgramId)
{
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(*program_id), |account| {
account.owner == *program_id && filter_closure(account)
})
} else {
bank.get_filtered_program_accounts(program_id, filter_closure)
}
}
/// Get an iterator of spl-token accounts by owner address
fn get_filtered_spl_token_accounts_by_owner(
&self,
bank: &Arc<Bank>,
owner_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> Vec<(Pubkey, Account)> {
// The by-owner accounts index checks for Token Account state and Owner address on inclusion.
// However, due to the current AccountsDB implementation, accounts may remain in storage as
// be zero-lamport Account::Default() after being wiped and reinitialized in a later updates.
// We include the redundant filters here to avoid returning these accounts.
//
// Filter on Token Account state
filters.push(RpcFilterType::DataSize(
TokenAccount::get_packed_len() as u64
));
// Filter on Owner address
filters.push(RpcFilterType::Memcmp(Memcmp {
offset: SPL_TOKEN_ACCOUNT_OWNER_OFFSET,
bytes: MemcmpEncodedBytes::Binary(owner_key.to_string()),
encoding: None,
}));
if self
.config
.account_indexes
.contains(&AccountIndex::SplTokenOwner)
{
bank.get_filtered_indexed_accounts(&IndexKey::SplTokenOwner(*owner_key), |account| {
account.owner == spl_token_id_v2_0()
&& filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data.len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data),
})
})
} else {
self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters)
}
}
/// Get an iterator of spl-token accounts by mint address
fn get_filtered_spl_token_accounts_by_mint(
&self,
bank: &Arc<Bank>,
mint_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> Vec<(Pubkey, Account)> {
// The by-mint accounts index checks for Token Account state and Mint address on inclusion.
// However, due to the current AccountsDB implementation, accounts may remain in storage as
// be zero-lamport Account::Default() after being wiped and reinitialized in a later updates.
// We include the redundant filters here to avoid returning these accounts.
//
// Filter on Token Account state
filters.push(RpcFilterType::DataSize(
TokenAccount::get_packed_len() as u64
));
// Filter on Mint address
filters.push(RpcFilterType::Memcmp(Memcmp {
offset: SPL_TOKEN_ACCOUNT_MINT_OFFSET,
bytes: MemcmpEncodedBytes::Binary(mint_key.to_string()),
encoding: None,
}));
if self
.config
.account_indexes
.contains(&AccountIndex::SplTokenMint)
{
bank.get_filtered_indexed_accounts(&IndexKey::SplTokenMint(*mint_key), |account| {
account.owner == spl_token_id_v2_0()
&& filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data.len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data),
})
})
} else {
self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters)
}
}
}
fn verify_transaction(transaction: &Transaction) -> Result<()> {
@ -1397,20 +1496,32 @@ fn get_encoded_account(
Ok(response)
}
/// Use a set of filters to get an iterator of keyed program accounts from a bank
fn get_filtered_program_accounts(
bank: &Arc<Bank>,
program_id: &Pubkey,
filters: Vec<RpcFilterType>,
) -> impl Iterator<Item = (Pubkey, Account)> {
bank.get_program_accounts(&program_id)
.into_iter()
.filter(move |(_, account)| {
filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data.len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data),
})
})
fn get_spl_token_owner_filter(program_id: &Pubkey, filters: &[RpcFilterType]) -> Option<Pubkey> {
if program_id != &spl_token_id_v2_0() {
return None;
}
let mut data_size_filter: Option<u64> = None;
let mut owner_key: Option<Pubkey> = None;
for filter in filters {
match filter {
RpcFilterType::DataSize(size) => data_size_filter = Some(*size),
RpcFilterType::Memcmp(Memcmp {
offset: SPL_TOKEN_ACCOUNT_OWNER_OFFSET,
bytes: MemcmpEncodedBytes::Binary(bytes),
..
}) => {
if let Ok(key) = Pubkey::from_str(bytes) {
owner_key = Some(key)
}
}
_ => {}
}
}
if data_size_filter == Some(TokenAccount::get_packed_len() as u64) {
owner_key
} else {
None
}
}
pub(crate) fn get_parsed_token_account(
@ -5783,6 +5894,54 @@ pub mod tests {
);
}
#[test]
fn test_get_spl_token_owner_filter() {
let owner = Pubkey::new_unique();
assert_eq!(
get_spl_token_owner_filter(
&Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(),
&[
RpcFilterType::Memcmp(Memcmp {
offset: 32,
bytes: MemcmpEncodedBytes::Binary(owner.to_string()),
encoding: None
}),
RpcFilterType::DataSize(165)
],
)
.unwrap(),
owner
);
// Filtering on mint instead of owner
assert!(get_spl_token_owner_filter(
&Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(),
&[
RpcFilterType::Memcmp(Memcmp {
offset: 0,
bytes: MemcmpEncodedBytes::Binary(owner.to_string()),
encoding: None
}),
RpcFilterType::DataSize(165)
],
)
.is_none());
// Wrong program id
assert!(get_spl_token_owner_filter(
&Pubkey::new_unique(),
&[
RpcFilterType::Memcmp(Memcmp {
offset: 32,
bytes: MemcmpEncodedBytes::Binary(owner.to_string()),
encoding: None
}),
RpcFilterType::DataSize(165)
],
)
.is_none());
}
#[test]
fn test_rpc_single_gossip() {
let exit = Arc::new(AtomicBool::new(false));

View File

@ -45,6 +45,7 @@ use solana_ledger::{
use solana_measure::measure::Measure;
use solana_metrics::datapoint_info;
use solana_runtime::{
accounts_index::AccountIndex,
bank::Bank,
bank_forks::{BankForks, SnapshotConfig},
commitment::BlockCommitmentCache,
@ -117,6 +118,7 @@ pub struct ValidatorConfig {
pub send_transaction_leader_forward_count: u64,
pub no_poh_speed_test: bool,
pub poh_pinned_cpu_core: usize,
pub account_indexes: HashSet<AccountIndex>,
}
impl Default for ValidatorConfig {
@ -161,6 +163,7 @@ impl Default for ValidatorConfig {
send_transaction_leader_forward_count: 2,
no_poh_speed_test: true,
poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
account_indexes: HashSet::new(),
}
}
}
@ -951,6 +954,7 @@ fn new_banks_from_ledger(
new_hard_forks: config.new_hard_forks.clone(),
frozen_accounts: config.frozen_accounts.clone(),
debug_keys: config.debug_keys.clone(),
account_indexes: config.account_indexes.clone(),
..blockstore_processor::ProcessOptions::default()
};

View File

@ -59,7 +59,10 @@ mod tests {
signature::{Keypair, Signer},
system_transaction,
};
use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc};
use std::{
collections::HashSet, fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel,
sync::Arc,
};
use tempfile::TempDir;
DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS!(V1_2_0, Development, V1_2_0_Development);
@ -93,6 +96,7 @@ mod tests {
&[],
None,
None,
HashSet::new(),
);
bank0.freeze();
let mut bank_forks = BankForks::new(bank0);
@ -148,6 +152,7 @@ mod tests {
old_genesis_config,
None,
None,
HashSet::new(),
)
.unwrap();

View File

@ -68,6 +68,7 @@ pub fn load(
genesis_config,
process_options.debug_keys.clone(),
Some(&crate::builtins::get(process_options.bpf_jit)),
process_options.account_indexes.clone(),
)
.expect("Load from snapshot failed");
if let Some(shrink_paths) = shrink_paths {

View File

@ -15,6 +15,7 @@ use solana_measure::{measure::Measure, thread_mem_usage};
use solana_metrics::{datapoint_error, inc_new_counter_debug};
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::{
accounts_index::AccountIndex,
bank::{
Bank, InnerInstructionsList, TransactionBalancesSet, TransactionExecutionResult,
TransactionLogMessages, TransactionResults,
@ -344,6 +345,7 @@ pub struct ProcessOptions {
pub new_hard_forks: Option<Vec<Slot>>,
pub frozen_accounts: Vec<Pubkey>,
pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
pub account_indexes: HashSet<AccountIndex>,
}
pub fn process_blockstore(
@ -368,6 +370,7 @@ pub fn process_blockstore(
&opts.frozen_accounts,
opts.debug_keys.clone(),
Some(&crate::builtins::get(opts.bpf_jit)),
opts.account_indexes.clone(),
);
let bank0 = Arc::new(bank0);
info!("processing ledger for slot 0...");
@ -2891,7 +2894,14 @@ pub mod tests {
genesis_config: &GenesisConfig,
account_paths: Vec<PathBuf>,
) -> EpochSchedule {
let bank = Bank::new_with_paths(&genesis_config, account_paths, &[], None, None);
let bank = Bank::new_with_paths(
&genesis_config,
account_paths,
&[],
None,
None,
HashSet::new(),
);
*bank.epoch_schedule()
}

View File

@ -14,7 +14,7 @@ use solana_sdk::{
pubkey::Pubkey,
};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
path::PathBuf,
sync::{Arc, RwLock},
thread::Builder,
@ -49,6 +49,7 @@ fn test_accounts_create(bencher: &mut Bencher) {
&[],
None,
None,
HashSet::new(),
);
bencher.iter(|| {
let mut pubkeys: Vec<Pubkey> = vec![];
@ -66,6 +67,7 @@ fn test_accounts_squash(bencher: &mut Bencher) {
&[],
None,
None,
HashSet::new(),
));
let mut pubkeys: Vec<Pubkey> = vec![];
deposit_many(&bank1, &mut pubkeys, 250_000);

View File

@ -4,7 +4,8 @@ extern crate test;
use rand::{thread_rng, Rng};
use solana_runtime::{accounts_db::AccountInfo, accounts_index::AccountsIndex};
use solana_sdk::pubkey;
use solana_sdk::pubkey::{self, Pubkey};
use std::collections::HashSet;
use test::Bencher;
#[bench]
@ -18,7 +19,15 @@ fn bench_accounts_index(bencher: &mut Bencher) {
let index = AccountsIndex::<AccountInfo>::default();
for f in 0..NUM_FORKS {
for pubkey in pubkeys.iter().take(NUM_PUBKEYS) {
index.upsert(f, pubkey, AccountInfo::default(), &mut reclaims);
index.upsert(
f,
pubkey,
&Pubkey::default(),
&[],
&HashSet::new(),
AccountInfo::default(),
&mut reclaims,
);
}
}
@ -30,6 +39,9 @@ fn bench_accounts_index(bencher: &mut Bencher) {
index.upsert(
fork,
&pubkeys[pubkey],
&Pubkey::default(),
&[],
&HashSet::new(),
AccountInfo::default(),
&mut reclaims,
);

View File

@ -1,6 +1,6 @@
use crate::{
accounts_db::{AccountsDB, AppendVecId, BankHashInfo, ErrorCounters},
accounts_index::Ancestors,
accounts_index::{AccountIndex, Ancestors, IndexKey},
append_vec::StoredAccount,
bank::{
NonceRollbackFull, NonceRollbackInfo, TransactionCheckResult, TransactionExecutionResult,
@ -82,8 +82,20 @@ pub enum AccountAddressFilter {
impl Accounts {
pub fn new(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
Self::new_with_indexes(paths, cluster_type, HashSet::new())
}
pub fn new_with_indexes(
paths: Vec<PathBuf>,
cluster_type: &ClusterType,
account_indexes: HashSet<AccountIndex>,
) -> Self {
Self {
accounts_db: Arc::new(AccountsDB::new(paths, cluster_type)),
accounts_db: Arc::new(AccountsDB::new_with_indexes(
paths,
cluster_type,
account_indexes,
)),
account_locks: Mutex::new(HashSet::new()),
readonly_locks: Arc::new(RwLock::new(Some(HashMap::new()))),
..Self::default()
@ -447,7 +459,7 @@ impl Accounts {
|stored_account: &StoredAccount,
_id: AppendVecId,
accum: &mut Vec<(Pubkey, u64, B)>| {
if let Some(val) = func(stored_account) {
if let Some(val) = func(&stored_account) {
accum.push((
stored_account.meta.pubkey,
std::u64::MAX - stored_account.meta.write_version,
@ -592,6 +604,37 @@ impl Accounts {
)
}
pub fn load_by_program_with_filter<F: Fn(&Account) -> bool>(
&self,
ancestors: &Ancestors,
program_id: &Pubkey,
filter: F,
) -> Vec<(Pubkey, Account)> {
self.accounts_db.scan_accounts(
ancestors,
|collector: &mut Vec<(Pubkey, Account)>, some_account_tuple| {
Self::load_while_filtering(collector, some_account_tuple, |account| {
account.owner == *program_id && filter(account)
})
},
)
}
pub fn load_by_index_key_with_filter<F: Fn(&Account) -> bool>(
&self,
ancestors: &Ancestors,
index_key: &IndexKey,
filter: F,
) -> Vec<(Pubkey, Account)> {
self.accounts_db.index_scan_accounts(
ancestors,
*index_key,
|collector: &mut Vec<(Pubkey, Account)>, some_account_tuple| {
Self::load_while_filtering(collector, some_account_tuple, |account| filter(account))
},
)
}
pub fn load_all(&self, ancestors: &Ancestors) -> Vec<(Pubkey, Account, Slot)> {
self.accounts_db.scan_accounts(
ancestors,

View File

@ -19,7 +19,7 @@
//! commit for each slot entry would be indexed.
use crate::{
accounts_index::{AccountsIndex, Ancestors, SlotList, SlotSlice},
accounts_index::{AccountIndex, AccountsIndex, Ancestors, IndexKey, SlotList, SlotSlice},
append_vec::{AppendVec, StoredAccount, StoredMeta},
};
use blake3::traits::digest::Digest;
@ -41,7 +41,7 @@ use solana_sdk::{
use std::convert::TryFrom;
use std::{
boxed::Box,
collections::{HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
io::{Error as IOError, Result as IOResult},
ops::RangeBounds,
@ -467,6 +467,8 @@ pub struct AccountsDB {
stats: AccountsStats,
pub cluster_type: Option<ClusterType>,
pub account_indexes: HashSet<AccountIndex>,
}
#[derive(Debug, Default)]
@ -545,17 +547,27 @@ impl Default for AccountsDB {
frozen_accounts: HashMap::new(),
stats: AccountsStats::default(),
cluster_type: None,
account_indexes: HashSet::new(),
}
}
}
impl AccountsDB {
pub fn new(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
AccountsDB::new_with_indexes(paths, cluster_type, HashSet::new())
}
pub fn new_with_indexes(
paths: Vec<PathBuf>,
cluster_type: &ClusterType,
account_indexes: HashSet<AccountIndex>,
) -> Self {
let new = if !paths.is_empty() {
Self {
paths,
temp_paths: None,
cluster_type: Some(*cluster_type),
account_indexes,
..Self::default()
}
} else {
@ -566,6 +578,7 @@ impl AccountsDB {
paths,
temp_paths: Some(temp_dirs),
cluster_type: Some(*cluster_type),
account_indexes,
..Self::default()
}
};
@ -637,6 +650,7 @@ impl AccountsDB {
&pubkey,
&mut reclaims,
max_clean_root,
&self.account_indexes,
);
}
reclaims
@ -744,7 +758,9 @@ impl AccountsDB {
let mut dead_keys = Vec::new();
for (pubkey, slots_set) in pubkey_to_slot_set {
let (new_reclaims, is_empty) = self.accounts_index.purge_exact(&pubkey, slots_set);
let (new_reclaims, is_empty) =
self.accounts_index
.purge_exact(&pubkey, slots_set, &self.account_indexes);
if is_empty {
dead_keys.push(pubkey);
}
@ -917,7 +933,8 @@ impl AccountsDB {
let (reclaims, dead_keys) = self.purge_keys_exact(pubkey_to_slot_set);
self.accounts_index.handle_dead_keys(&dead_keys);
self.accounts_index
.handle_dead_keys(&dead_keys, &self.account_indexes);
self.handle_reclaims(&reclaims, None, false, None);
@ -1388,6 +1405,30 @@ impl AccountsDB {
collector
}
pub fn index_scan_accounts<F, A>(
&self,
ancestors: &Ancestors,
index_key: IndexKey,
scan_func: F,
) -> A
where
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>),
A: Default,
{
let mut collector = A::default();
self.accounts_index.index_scan_accounts(
ancestors,
index_key,
|pubkey, (account_info, slot)| {
let account_slot = self
.get_account_from_storage(slot, account_info)
.map(|account| (pubkey, account, slot));
scan_func(&mut collector, account_slot)
},
);
collector
}
/// Scan a specific slot through all the account storage in parallel
pub fn scan_account_storage<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
where
@ -1413,8 +1454,8 @@ impl AccountsDB {
.map(|storage| {
let accounts = storage.accounts.accounts(0);
let mut retval = B::default();
accounts.iter().for_each(|stored_account| {
scan_func(stored_account, storage.append_vec_id(), &mut retval)
accounts.into_iter().for_each(|stored_account| {
scan_func(&stored_account, storage.append_vec_id(), &mut retval)
});
retval
})
@ -1845,6 +1886,7 @@ impl AccountsDB {
remove_slot,
pubkey,
&mut reclaims,
&self.account_indexes,
);
}
}
@ -2510,8 +2552,15 @@ impl AccountsDB {
let mut reclaims = SlotList::<AccountInfo>::with_capacity(infos.len() * 2);
for (info, pubkey_account) in infos.into_iter().zip(accounts.iter()) {
let pubkey = pubkey_account.0;
self.accounts_index
.upsert(slot, pubkey, info, &mut reclaims);
self.accounts_index.upsert(
slot,
pubkey,
&pubkey_account.1.owner,
&pubkey_account.1.data,
&self.account_indexes,
info,
&mut reclaims,
);
}
reclaims
}
@ -2924,6 +2973,8 @@ impl AccountsDB {
}
pub fn generate_index(&self) {
type AccountsMap<'a> =
DashMap<Pubkey, Mutex<BTreeMap<u64, (AppendVecId, StoredAccount<'a>)>>>;
let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)]
slots.sort();
@ -2935,52 +2986,68 @@ impl AccountsDB {
info!("generating index: {}/{} slots...", index, slots.len());
last_log_update = now;
}
let accumulator: Vec<HashMap<Pubkey, Vec<(u64, AccountInfo)>>> = self
.scan_account_storage_inner(
*slot,
|stored_account: &StoredAccount,
store_id: AppendVecId,
accum: &mut HashMap<Pubkey, Vec<(u64, AccountInfo)>>| {
let accounts_map: AccountsMap = AccountsMap::new();
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_stores(*slot)
.map(|res| res.read().unwrap().values().cloned().collect())
.unwrap_or_default();
self.thread_pool.install(|| {
storage_maps.par_iter().for_each(|storage| {
let accounts = storage.accounts.accounts(0);
accounts.into_iter().for_each(|stored_account| {
let entry = accounts_map
.get(&stored_account.meta.pubkey)
.unwrap_or_else(|| {
accounts_map
.entry(stored_account.meta.pubkey)
.or_insert(Mutex::new(BTreeMap::new()))
.downgrade()
});
assert!(
// There should only be one update per write version for a specific slot
// and account
entry
.lock()
.unwrap()
.insert(
stored_account.meta.write_version,
(storage.append_vec_id(), stored_account)
)
.is_none()
);
})
});
});
// Need to restore indexes even with older write versions which may
// be shielding other accounts. When they are then purged, the
// original non-shielded account value will be visible when the account
// is restored from the append-vec
if !accounts_map.is_empty() {
let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
for (pubkey, account_infos) in accounts_map.into_iter() {
for (_, (store_id, stored_account)) in
account_infos.into_inner().unwrap().into_iter()
{
let account_info = AccountInfo {
store_id,
offset: stored_account.offset,
lamports: stored_account.account_meta.lamports,
};
let entry = accum
.entry(stored_account.meta.pubkey)
.or_insert_with(Vec::new);
entry.push((stored_account.meta.write_version, account_info));
},
);
let mut accounts_map: HashMap<Pubkey, Vec<(u64, AccountInfo)>> = HashMap::new();
for accumulator_entry in accumulator.iter() {
for (pubkey, storage_entry) in accumulator_entry {
let entry = accounts_map.entry(*pubkey).or_insert_with(Vec::new);
entry.extend(storage_entry.iter().cloned());
}
}
// Need to restore indexes even with older write versions which may
// be shielding other accounts. When they are then purged, the
// original non-shielded account value will be visible when the account
// is restored from the append-vec
if !accumulator.is_empty() {
let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
for (pubkey, account_infos) in accounts_map.iter_mut() {
account_infos.sort_by(|a, b| a.0.cmp(&b.0));
for (_, account_info) in account_infos {
self.accounts_index.upsert(
*slot,
pubkey,
account_info.clone(),
&pubkey,
&stored_account.account_meta.owner,
&stored_account.data,
&self.account_indexes,
account_info,
&mut _reclaims,
);
}
}
}
}
// Need to add these last, otherwise older updates will be cleaned
for slot in slots {
self.accounts_index.add_root(slot);
@ -3083,9 +3150,13 @@ impl AccountsDB {
pub mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use crate::{accounts_index::RefCount, append_vec::AccountMeta};
use crate::{
accounts_index::tests::*, accounts_index::RefCount, append_vec::AccountMeta,
inline_spl_token_v2_0,
};
use assert_matches::assert_matches;
use rand::{thread_rng, Rng};
use solana_sdk::pubkey::PUBKEY_BYTES;
use solana_sdk::{account::Account, hash::HASH_BYTES};
use std::{fs, iter::FromIterator, str::FromStr};
@ -3882,11 +3953,26 @@ pub mod tests {
fn test_clean_old_with_both_normal_and_zero_lamport_accounts() {
solana_logger::setup();
let accounts = AccountsDB::new(Vec::new(), &ClusterType::Development);
let accounts = AccountsDB::new_with_indexes(
Vec::new(),
&ClusterType::Development,
spl_token_mint_index_enabled(),
);
let pubkey1 = solana_sdk::pubkey::new_rand();
let pubkey2 = solana_sdk::pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
// Set up account to be added to secondary index
let mint_key = Pubkey::new_unique();
let mut account_data_with_mint =
vec![0; inline_spl_token_v2_0::state::Account::get_packed_len()];
account_data_with_mint[..PUBKEY_BYTES].clone_from_slice(&(mint_key.clone().to_bytes()));
let mut normal_account = Account::new(1, 0, &Account::default().owner);
normal_account.owner = inline_spl_token_v2_0::id();
normal_account.data = account_data_with_mint.clone();
let mut zero_account = Account::new(0, 0, &Account::default().owner);
zero_account.owner = inline_spl_token_v2_0::id();
zero_account.data = account_data_with_mint;
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
@ -3905,6 +3991,19 @@ pub mod tests {
assert_eq!(accounts.alive_account_count_in_store(1), 1);
assert_eq!(accounts.alive_account_count_in_store(2), 1);
// Secondary index should still find both pubkeys
let mut found_accounts = HashSet::new();
accounts.accounts_index.index_scan_accounts(
&HashMap::new(),
IndexKey::SplTokenMint(mint_key),
|key, _| {
found_accounts.insert(*key);
},
);
assert_eq!(found_accounts.len(), 2);
assert!(found_accounts.contains(&pubkey1));
assert!(found_accounts.contains(&pubkey2));
accounts.clean_accounts(None);
//both zero lamport and normal accounts are cleaned up
@ -3915,9 +4014,18 @@ pub mod tests {
assert_eq!(accounts.alive_account_count_in_store(1), 0);
assert_eq!(accounts.alive_account_count_in_store(2), 1);
// Pubkey 1, a zero lamport account, should no longer exist in accounts index
// because it has been removed
// `pubkey1`, a zero lamport account, should no longer exist in accounts index
// because it has been removed by the clean
assert!(accounts.accounts_index.get(&pubkey1, None, None).is_none());
// Secondary index should have purged `pubkey1` as well
let mut found_accounts = vec![];
accounts.accounts_index.index_scan_accounts(
&HashMap::new(),
IndexKey::SplTokenMint(mint_key),
|key, _| found_accounts.push(*key),
);
assert_eq!(found_accounts, vec![pubkey2]);
}
#[test]
@ -4503,7 +4611,8 @@ pub mod tests {
let account2 = Account::new(3, 0, &key);
db.store(2, &[(&key1, &account2)]);
db.accounts_index.handle_dead_keys(&dead_keys);
db.accounts_index
.handle_dead_keys(&dead_keys, &HashSet::new());
db.print_accounts_stats("post");
let ancestors = vec![(2, 0)].into_iter().collect();
@ -5408,12 +5517,60 @@ pub mod tests {
lamports: 0,
};
let mut reclaims = vec![];
accounts_index.upsert(0, &key0, info0, &mut reclaims);
accounts_index.upsert(1, &key0, info1.clone(), &mut reclaims);
accounts_index.upsert(1, &key1, info1, &mut reclaims);
accounts_index.upsert(2, &key1, info2.clone(), &mut reclaims);
accounts_index.upsert(2, &key2, info2, &mut reclaims);
accounts_index.upsert(3, &key2, info3, &mut reclaims);
accounts_index.upsert(
0,
&key0,
&Pubkey::default(),
&[],
&HashSet::new(),
info0,
&mut reclaims,
);
accounts_index.upsert(
1,
&key0,
&Pubkey::default(),
&[],
&HashSet::new(),
info1.clone(),
&mut reclaims,
);
accounts_index.upsert(
1,
&key1,
&Pubkey::default(),
&[],
&HashSet::new(),
info1,
&mut reclaims,
);
accounts_index.upsert(
2,
&key1,
&Pubkey::default(),
&[],
&HashSet::new(),
info2.clone(),
&mut reclaims,
);
accounts_index.upsert(
2,
&key2,
&Pubkey::default(),
&[],
&HashSet::new(),
info2,
&mut reclaims,
);
accounts_index.upsert(
3,
&key2,
&Pubkey::default(),
&[],
&HashSet::new(),
info3,
&mut reclaims,
);
accounts_index.add_root(0);
accounts_index.add_root(1);
accounts_index.add_root(2);

File diff suppressed because it is too large Load Diff

View File

@ -8,10 +8,11 @@ use crate::{
TransactionLoadResult, TransactionLoaders,
},
accounts_db::{ErrorCounters, SnapshotStorages},
accounts_index::Ancestors,
accounts_index::{AccountIndex, Ancestors, IndexKey},
blockhash_queue::BlockhashQueue,
builtins::{self, ActivationType},
epoch_stakes::{EpochStakes, NodeVoteAccounts},
inline_spl_token_v2_0,
instruction_recorder::InstructionRecorder,
log_collector::LogCollector,
message_processor::{Executors, MessageProcessor},
@ -86,29 +87,6 @@ use std::{
time::Duration,
};
// Partial SPL Token v2.0.x declarations inlined to avoid an external dependency on the spl-token crate
pub mod inline_spl_token_v2_0 {
solana_sdk::declare_id!("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA");
pub mod native_mint {
solana_sdk::declare_id!("So11111111111111111111111111111111111111112");
/*
Mint {
mint_authority: COption::None,
supply: 0,
decimals: 9,
is_initialized: true,
freeze_authority: COption::None,
}
*/
pub const ACCOUNT_DATA: [u8; 82] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
}
}
pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;
pub const MAX_LEADER_SCHEDULE_STAKES: Epoch = 5;
@ -311,6 +289,7 @@ pub struct BankRc {
#[cfg(RUSTC_WITH_SPECIALIZATION)]
use solana_frozen_abi::abi_example::AbiExample;
#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl AbiExample for BankRc {
fn example() -> Self {
@ -880,7 +859,22 @@ impl Default for BlockhashQueue {
impl Bank {
pub fn new(genesis_config: &GenesisConfig) -> Self {
Self::new_with_paths(&genesis_config, Vec::new(), &[], None, None)
Self::new_with_paths(&genesis_config, Vec::new(), &[], None, None, HashSet::new())
}
#[cfg(test)]
pub(crate) fn new_with_indexes(
genesis_config: &GenesisConfig,
account_indexes: HashSet<AccountIndex>,
) -> Self {
Self::new_with_paths(
&genesis_config,
Vec::new(),
&[],
None,
None,
account_indexes,
)
}
pub fn new_with_paths(
@ -889,13 +883,18 @@ impl Bank {
frozen_account_pubkeys: &[Pubkey],
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_indexes: HashSet<AccountIndex>,
) -> Self {
let mut bank = Self::default();
bank.ancestors.insert(bank.slot(), 0);
bank.transaction_debug_keys = debug_keys;
bank.cluster_type = Some(genesis_config.cluster_type);
bank.rc.accounts = Arc::new(Accounts::new(paths, &genesis_config.cluster_type));
bank.rc.accounts = Arc::new(Accounts::new_with_indexes(
paths,
&genesis_config.cluster_type,
account_indexes,
));
bank.process_genesis_config(genesis_config);
bank.finish_init(genesis_config, additional_builtins);
@ -3985,6 +3984,26 @@ impl Bank {
.load_by_program(&self.ancestors, program_id)
}
pub fn get_filtered_program_accounts<F: Fn(&Account) -> bool>(
&self,
program_id: &Pubkey,
filter: F,
) -> Vec<(Pubkey, Account)> {
self.rc
.accounts
.load_by_program_with_filter(&self.ancestors, program_id, filter)
}
pub fn get_filtered_indexed_accounts<F: Fn(&Account) -> bool>(
&self,
index_key: &IndexKey,
filter: F,
) -> Vec<(Pubkey, Account)> {
self.rc
.accounts
.load_by_index_key_with_filter(&self.ancestors, index_key, filter)
}
pub fn get_all_accounts_with_modified_slots(&self) -> Vec<(Pubkey, Account, Slot)> {
self.rc.accounts.load_all(&self.ancestors)
}
@ -6702,7 +6721,7 @@ pub(crate) mod tests {
.accounts
.accounts_db
.accounts_index
.purge(&zero_lamport_pubkey);
.purge_roots(&zero_lamport_pubkey);
let some_slot = 1000;
let bank2_with_zero = Arc::new(Bank::new_from_parent(
@ -8679,6 +8698,53 @@ pub(crate) mod tests {
assert_eq!(bank3.get_program_accounts(&program_id).len(), 2);
}
#[test]
fn test_get_filtered_indexed_accounts() {
let (genesis_config, _mint_keypair) = create_genesis_config(500);
let mut account_indexes = HashSet::new();
account_indexes.insert(AccountIndex::ProgramId);
let bank = Arc::new(Bank::new_with_indexes(&genesis_config, account_indexes));
let address = Pubkey::new_unique();
let program_id = Pubkey::new_unique();
let account = Account::new(1, 0, &program_id);
bank.store_account(&address, &account);
let indexed_accounts =
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true);
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, account));
// Even though the account is re-stored in the bank (and the index) under a new program id,
// it is still present in the index under the original program id as well. This
// demonstrates the need for a redundant post-processing filter.
let another_program_id = Pubkey::new_unique();
let new_account = Account::new(1, 0, &another_program_id);
let bank = Arc::new(new_from_parent(&bank));
bank.store_account(&address, &new_account);
let indexed_accounts =
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true);
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, new_account.clone()));
let indexed_accounts =
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |_| true);
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, new_account.clone()));
// Post-processing filter
let indexed_accounts = bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |account| {
account.owner == program_id
});
assert!(indexed_accounts.is_empty());
let indexed_accounts = bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |account| {
account.owner == another_program_id
});
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, new_account));
}
#[test]
fn test_status_cache_ancestors() {
solana_logger::setup();
@ -11041,6 +11107,7 @@ pub(crate) mod tests {
&[],
None,
Some(&builtins),
HashSet::new(),
));
// move to next epoch to create now deprecated rewards sysvar intentionally
let bank1 = Arc::new(Bank::new_from_parent(

View File

@ -0,0 +1,46 @@
// Partial SPL Token v2.0.x declarations inlined to avoid an external dependency on the spl-token crate
solana_sdk::declare_id!("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA");
/*
spl_token::state::Account {
mint: Pubkey,
owner: Pubkey,
amount: u64,
delegate: COption<Pubkey>,
state: AccountState,
is_native: COption<u64>,
delegated_amount: u64,
close_authority: COption<Pubkey>,
}
*/
pub const SPL_TOKEN_ACCOUNT_MINT_OFFSET: usize = 0;
pub const SPL_TOKEN_ACCOUNT_OWNER_OFFSET: usize = 32;
pub mod state {
const LEN: usize = 165;
pub struct Account;
impl Account {
pub fn get_packed_len() -> usize {
LEN
}
}
}
pub mod native_mint {
solana_sdk::declare_id!("So11111111111111111111111111111111111111112");
/*
Mint {
mint_authority: COption::None,
supply: 0,
decimals: 9,
is_initialized: true,
freeze_authority: COption::None,
}
*/
pub const ACCOUNT_DATA: [u8; 82] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
}

View File

@ -15,12 +15,14 @@ pub mod commitment;
pub mod epoch_stakes;
pub mod genesis_utils;
pub mod hardened_unpack;
pub mod inline_spl_token_v2_0;
pub mod instruction_recorder;
pub mod loader_utils;
pub mod log_collector;
pub mod message_processor;
mod native_loader;
pub mod rent_collector;
pub mod secondary_index;
pub mod serde_snapshot;
pub mod snapshot_package;
pub mod snapshot_utils;

View File

@ -0,0 +1,324 @@
use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use log::*;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
collections::{hash_map, HashMap, HashSet},
fmt::Debug,
sync::{Arc, RwLock},
};
pub type SecondaryReverseIndexEntry = RwLock<HashMap<Slot, Pubkey>>;
pub trait SecondaryIndexEntry: Debug {
fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock<HashSet<Slot>>));
fn get<T>(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock<HashSet<Slot>>>) -> T) -> T;
fn remove_key_if_empty(&self, key: &Pubkey);
fn is_empty(&self) -> bool;
fn keys(&self) -> Vec<Pubkey>;
fn len(&self) -> usize;
}
#[derive(Debug, Default)]
pub struct DashMapSecondaryIndexEntry {
pubkey_to_slot_set: DashMap<Pubkey, RwLock<HashSet<Slot>>>,
}
impl SecondaryIndexEntry for DashMapSecondaryIndexEntry {
fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock<HashSet<Slot>>)) {
let slot_set = self.pubkey_to_slot_set.get(key).unwrap_or_else(|| {
self.pubkey_to_slot_set
.entry(*key)
.or_insert(RwLock::new(HashSet::new()))
.downgrade()
});
f(&slot_set)
}
fn get<T>(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock<HashSet<Slot>>>) -> T) -> T {
let slot_set = self.pubkey_to_slot_set.get(key);
f(slot_set.as_ref().map(|entry_ref| entry_ref.value()))
}
fn remove_key_if_empty(&self, key: &Pubkey) {
if let Occupied(key_entry) = self.pubkey_to_slot_set.entry(*key) {
// Delete the `key` if the slot set is empty
let slot_set = key_entry.get();
// Write lock on `key_entry` above through the `entry`
// means nobody else has access to this lock at this time,
// so this check for empty -> remove() is atomic
if slot_set.read().unwrap().is_empty() {
key_entry.remove();
}
}
}
fn is_empty(&self) -> bool {
self.pubkey_to_slot_set.is_empty()
}
fn keys(&self) -> Vec<Pubkey> {
self.pubkey_to_slot_set
.iter()
.map(|entry_ref| *entry_ref.key())
.collect()
}
fn len(&self) -> usize {
self.pubkey_to_slot_set.len()
}
}
#[derive(Debug, Default)]
pub struct RwLockSecondaryIndexEntry {
pubkey_to_slot_set: RwLock<HashMap<Pubkey, Arc<RwLock<HashSet<Slot>>>>>,
}
impl SecondaryIndexEntry for RwLockSecondaryIndexEntry {
fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock<HashSet<Slot>>)) {
let slot_set = self.pubkey_to_slot_set.read().unwrap().get(key).cloned();
let slot_set = {
if let Some(slot_set) = slot_set {
slot_set
} else {
self.pubkey_to_slot_set
.write()
.unwrap()
.entry(*key)
.or_insert_with(|| Arc::new(RwLock::new(HashSet::new())))
.clone()
}
};
f(&slot_set)
}
fn get<T>(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock<HashSet<Slot>>>) -> T) -> T {
let slot_set = self.pubkey_to_slot_set.read().unwrap().get(key).cloned();
f(slot_set.as_deref())
}
fn remove_key_if_empty(&self, key: &Pubkey) {
if let hash_map::Entry::Occupied(key_entry) =
self.pubkey_to_slot_set.write().unwrap().entry(*key)
{
// Delete the `key` if the slot set is empty
let slot_set = key_entry.get();
// Write lock on `key_entry` above through the `entry`
// means nobody else has access to this lock at this time,
// so this check for empty -> remove() is atomic
if slot_set.read().unwrap().is_empty() {
key_entry.remove();
}
}
}
fn is_empty(&self) -> bool {
self.pubkey_to_slot_set.read().unwrap().is_empty()
}
fn keys(&self) -> Vec<Pubkey> {
self.pubkey_to_slot_set
.read()
.unwrap()
.keys()
.cloned()
.collect()
}
fn len(&self) -> usize {
self.pubkey_to_slot_set.read().unwrap().len()
}
}
#[derive(Debug, Default)]
pub struct SecondaryIndex<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send> {
// Map from index keys to index values
pub index: DashMap<Pubkey, SecondaryIndexEntryType>,
// Map from index values back to index keys, used for cleanup.
// Alternative is to store Option<Pubkey> in each AccountInfo in the
// AccountsIndex if something is an SPL account with a mint, but then
// every AccountInfo would have to allocate `Option<Pubkey>`
pub reverse_index: DashMap<Pubkey, SecondaryReverseIndexEntry>,
}
impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
SecondaryIndex<SecondaryIndexEntryType>
{
pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey, slot: Slot) {
{
let pubkeys_map = self.index.get(key).unwrap_or_else(|| {
self.index
.entry(*key)
.or_insert(SecondaryIndexEntryType::default())
.downgrade()
});
pubkeys_map.get_or_create(inner_key, &|slots_set: &RwLock<HashSet<Slot>>| {
let contains_key = slots_set.read().unwrap().contains(&slot);
if !contains_key {
slots_set.write().unwrap().insert(slot);
}
});
}
let prev_key = {
let slots_map = self.reverse_index.get(inner_key).unwrap_or_else(|| {
self.reverse_index
.entry(*inner_key)
.or_insert(RwLock::new(HashMap::new()))
.downgrade()
});
let should_insert = {
// Most of the time, key should already exist and match
// the one in the update
if let Some(existing_key) = slots_map.read().unwrap().get(&slot) {
existing_key != key
} else {
// If there is no key yet, then insert
true
}
};
if should_insert {
slots_map.write().unwrap().insert(slot, *key)
} else {
None
}
};
if let Some(prev_key) = prev_key {
// If the inner key was moved to a different primary key, remove
// the previous index entry.
// Check is necessary because anoher thread's writes could feasibly be
// interleaved between `should_insert = { ... slots_map.get(...) ... }` and
// `prev_key = { ... slots_map.insert(...) ... }`
// Currently this isn't possible due to current AccountsIndex's (pubkey, slot)-per-thread
// exclusive-locking, but check is here for future-proofing a more relaxed implementation
if prev_key != *key {
self.remove_index_entries(&prev_key, inner_key, &[slot]);
}
}
}
pub fn remove_index_entries(&self, key: &Pubkey, inner_key: &Pubkey, slots: &[Slot]) {
let is_key_empty = if let Some(inner_key_map) = self.index.get(&key) {
// Delete the slot from the slot set
let is_inner_key_empty =
inner_key_map.get(&inner_key, &|slot_set: Option<&RwLock<HashSet<Slot>>>| {
if let Some(slot_set) = slot_set {
let mut w_slot_set = slot_set.write().unwrap();
for slot in slots.iter() {
let is_present = w_slot_set.remove(slot);
if !is_present {
warn!("Reverse index is missing previous entry for key {}, inner_key: {}, slot: {}",
key, inner_key, slot);
}
}
w_slot_set.is_empty()
} else {
false
}
});
// Check if `key` is empty
if is_inner_key_empty {
// Write lock on `inner_key_entry` above through the `entry`
// means nobody else has access to this lock at this time,
// so this check for empty -> remove() is atomic
inner_key_map.remove_key_if_empty(inner_key);
inner_key_map.is_empty()
} else {
false
}
} else {
false
};
// Delete the `key` if the set of inner keys is empty
if is_key_empty {
// Other threads may have interleaved writes to this `key`,
// so double-check again for its emptiness
if let Occupied(key_entry) = self.index.entry(*key) {
if key_entry.get().is_empty() {
key_entry.remove();
}
}
}
}
// Specifying `slots_to_remove` == Some will only remove keys for those specific slots
// found for the `inner_key` in the reverse index. Otherwise, passing `None`
// will remove all keys that are found for the `inner_key` in the reverse index.
// Note passing `None` is dangerous unless you're sure there's no other competing threads
// writing updates to the index for this Pubkey at the same time!
pub fn remove_by_inner_key(&self, inner_key: &Pubkey, slots_to_remove: Option<&HashSet<Slot>>) {
// Save off which keys in `self.index` had slots removed so we can remove them
// after we purge the reverse index
let mut key_to_removed_slots: HashMap<Pubkey, Vec<Slot>> = HashMap::new();
// Check if the entry for `inner_key` in the reverse index is empty
// and can be removed
let needs_remove = {
if let Some(slots_to_remove) = slots_to_remove {
self.reverse_index
.get(inner_key)
.map(|slots_map| {
// Ideally we use a concurrent map here as well to prevent clean
// from blocking writes, but memory usage of DashMap is high
let mut w_slots_map = slots_map.value().write().unwrap();
for slot in slots_to_remove.iter() {
if let Some(removed_key) = w_slots_map.remove(slot) {
key_to_removed_slots
.entry(removed_key)
.or_default()
.push(*slot);
}
}
w_slots_map.is_empty()
})
.unwrap_or(false)
} else {
if let Some((_, removed_slot_map)) = self.reverse_index.remove(inner_key) {
for (slot, removed_key) in removed_slot_map.into_inner().unwrap().into_iter() {
key_to_removed_slots
.entry(removed_key)
.or_default()
.push(slot);
}
}
// We just removed the key, no need to remove it again
false
}
};
if needs_remove {
// Other threads may have interleaved writes to this `inner_key`, between
// releasing the `self.reverse_index.get(inner_key)` lock and now,
// so double-check again for emptiness
if let Occupied(slot_map) = self.reverse_index.entry(*inner_key) {
if slot_map.get().read().unwrap().is_empty() {
slot_map.remove();
}
}
}
// Remove this value from those keys
for (key, slots) in key_to_removed_slots {
self.remove_index_entries(&key, inner_key, &slots);
}
}
pub fn get(&self, key: &Pubkey) -> Vec<Pubkey> {
if let Some(inner_keys_map) = self.index.get(key) {
inner_keys_map.keys()
} else {
vec![]
}
}
}

View File

@ -2,7 +2,7 @@ use {
crate::{
accounts::Accounts,
accounts_db::{AccountStorageEntry, AccountsDB, AppendVecId, BankHashInfo},
accounts_index::Ancestors,
accounts_index::{AccountIndex, Ancestors},
append_vec::AppendVec,
bank::{Bank, BankFieldsToDeserialize, BankRc, Builtins},
blockhash_queue::BlockhashQueue,
@ -126,6 +126,7 @@ pub(crate) fn bank_from_stream<R, P>(
frozen_account_pubkeys: &[Pubkey],
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_indexes: HashSet<AccountIndex>,
) -> std::result::Result<Bank, Error>
where
R: Read,
@ -144,6 +145,7 @@ where
append_vecs_path,
debug_keys,
additional_builtins,
account_indexes,
)?;
Ok(bank)
}};
@ -230,6 +232,7 @@ fn reconstruct_bank_from_fields<E, P>(
append_vecs_path: P,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_indexes: HashSet<AccountIndex>,
) -> Result<Bank, Error>
where
E: Into<AccountStorageEntry>,
@ -240,6 +243,7 @@ where
account_paths,
append_vecs_path,
&genesis_config.cluster_type,
account_indexes,
)?;
accounts_db.freeze_accounts(&bank_fields.ancestors, frozen_account_pubkeys);
@ -260,13 +264,14 @@ fn reconstruct_accountsdb_from_fields<E, P>(
account_paths: &[PathBuf],
stream_append_vecs_path: P,
cluster_type: &ClusterType,
account_indexes: HashSet<AccountIndex>,
) -> Result<AccountsDB, Error>
where
E: Into<AccountStorageEntry>,
P: AsRef<Path>,
{
let mut accounts_db = AccountsDB::new(account_paths.to_vec(), cluster_type);
let mut accounts_db =
AccountsDB::new_with_indexes(account_paths.to_vec(), cluster_type, account_indexes);
let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields;
// convert to two level map of slot -> id -> account storage entry

View File

@ -69,6 +69,7 @@ where
account_paths,
stream_append_vecs_path,
&ClusterType::Development,
HashSet::new(),
)
}
@ -212,6 +213,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) {
&[],
None,
None,
HashSet::new(),
)
.unwrap();
dbank.src = ref_sc;

View File

@ -1,4 +1,5 @@
use crate::{
accounts_index::AccountIndex,
bank::{Bank, BankSlotDelta, Builtins},
bank_forks::CompressionType,
hardened_unpack::{unpack_snapshot, UnpackError},
@ -592,6 +593,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
genesis_config: &GenesisConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_indexes: HashSet<AccountIndex>,
) -> Result<Bank> {
// Untar the snapshot into a temporary directory
let unpack_dir = tempfile::Builder::new()
@ -616,6 +618,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
genesis_config,
debug_keys,
additional_builtins,
account_indexes,
)?;
if !bank.verify_snapshot_bank() {
@ -775,6 +778,7 @@ fn rebuild_bank_from_snapshots<P>(
genesis_config: &GenesisConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_indexes: HashSet<AccountIndex>,
) -> Result<Bank>
where
P: AsRef<Path>,
@ -808,6 +812,7 @@ where
frozen_account_pubkeys,
debug_keys,
additional_builtins,
account_indexes,
),
}?)
})?;

View File

@ -3,6 +3,8 @@ use num_derive::{FromPrimitive, ToPrimitive};
use std::{convert::TryFrom, fmt, mem, str::FromStr};
use thiserror::Error;
/// Number of bytes in a pubkey
pub const PUBKEY_BYTES: usize = 32;
/// maximum length of derived `Pubkey` seed
pub const MAX_SEED_LEN: usize = 32;
/// Maximum number of seeds

View File

@ -3,7 +3,7 @@ pub use solana_program::pubkey::*;
/// New random Pubkey for tests and benchmarks.
#[cfg(feature = "full")]
pub fn new_rand() -> Pubkey {
Pubkey::new(&rand::random::<[u8; 32]>())
Pubkey::new(&rand::random::<[u8; PUBKEY_BYTES]>())
}
#[cfg(feature = "full")]

View File

@ -28,6 +28,7 @@ use solana_download_utils::{download_genesis_if_missing, download_snapshot};
use solana_ledger::blockstore_db::BlockstoreRecoveryMode;
use solana_perf::recycler::enable_recycler_warming;
use solana_runtime::{
accounts_index::AccountIndex,
bank_forks::{CompressionType, SnapshotConfig, SnapshotVersion},
hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
snapshot_utils::get_highest_snapshot_archive_path,
@ -1410,7 +1411,16 @@ pub fn main() {
}
Ok(())
})
.help("EXPERIMENTAL: Specify which CPU core PoH is pinned to")
.help("EXPERIMENTAL: Specify which CPU core PoH is pinned to"),
)
.arg(
Arg::with_name("account_indexes")
.long("account-index")
.takes_value(true)
.multiple(true)
.possible_values(&["program-id", "spl-token-owner", "spl-token-mint"])
.value_name("INDEX")
.help("Enable an accounts index, indexed by the selected account field"),
)
.get_matches();
@ -1489,6 +1499,17 @@ pub fn main() {
let contact_debug_interval = value_t_or_exit!(matches, "contact_debug_interval", u64);
let account_indexes: HashSet<AccountIndex> = matches
.values_of("account_indexes")
.unwrap_or_default()
.map(|value| match value {
"program-id" => AccountIndex::ProgramId,
"spl-token-mint" => AccountIndex::SplTokenMint,
"spl-token-owner" => AccountIndex::SplTokenOwner,
_ => unreachable!(),
})
.collect();
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
let mut validator_config = ValidatorConfig {
require_tower: matches.is_present("require_tower"),
@ -1523,6 +1544,7 @@ pub fn main() {
"health_check_slot_distance",
u64
),
account_indexes: account_indexes.clone(),
},
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
(
@ -1569,6 +1591,7 @@ pub fn main() {
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core")
.unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
account_indexes,
..ValidatorConfig::default()
};