Handle removing slots during account scans (#17471)

This commit is contained in:
carllin 2021-06-14 21:04:01 -07:00 committed by GitHub
parent 471b34132e
commit ccc013e134
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 934 additions and 319 deletions

View File

@ -1,5 +1,5 @@
//! Implementation defined RPC server errors
use thiserror::Error;
use {
crate::rpc_response::RpcSimulateTransactionResult,
jsonrpc_core::{Error, ErrorCode},
@ -17,35 +17,40 @@ pub const JSON_RPC_SERVER_ERROR_NO_SNAPSHOT: i64 = -32008;
pub const JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_SLOT_SKIPPED: i64 = -32009;
pub const JSON_RPC_SERVER_ERROR_KEY_EXCLUDED_FROM_SECONDARY_INDEX: i64 = -32010;
pub const JSON_RPC_SERVER_ERROR_TRANSACTION_HISTORY_NOT_AVAILABLE: i64 = -32011;
pub const JSON_RPC_SCAN_ERROR: i64 = -32012;
#[derive(Error, Debug)]
pub enum RpcCustomError {
#[error("BlockCleanedUp")]
BlockCleanedUp {
slot: Slot,
first_available_block: Slot,
},
#[error("SendTransactionPreflightFailure")]
SendTransactionPreflightFailure {
message: String,
result: RpcSimulateTransactionResult,
},
#[error("TransactionSignatureVerificationFailure")]
TransactionSignatureVerificationFailure,
BlockNotAvailable {
slot: Slot,
},
NodeUnhealthy {
num_slots_behind: Option<Slot>,
},
#[error("BlockNotAvailable")]
BlockNotAvailable { slot: Slot },
#[error("NodeUnhealthy")]
NodeUnhealthy { num_slots_behind: Option<Slot> },
#[error("TransactionPrecompileVerificationFailure")]
TransactionPrecompileVerificationFailure(solana_sdk::transaction::TransactionError),
SlotSkipped {
slot: Slot,
},
#[error("SlotSkipped")]
SlotSkipped { slot: Slot },
#[error("NoSnapshot")]
NoSnapshot,
LongTermStorageSlotSkipped {
slot: Slot,
},
KeyExcludedFromSecondaryIndex {
index_key: String,
},
#[error("LongTermStorageSlotSkipped")]
LongTermStorageSlotSkipped { slot: Slot },
#[error("KeyExcludedFromSecondaryIndex")]
KeyExcludedFromSecondaryIndex { index_key: String },
#[error("TransactionHistoryNotAvailable")]
TransactionHistoryNotAvailable,
#[error("ScanError")]
ScanError { message: String },
}
#[derive(Debug, Serialize, Deserialize)]
@ -141,6 +146,11 @@ impl From<RpcCustomError> for Error {
message: "Transaction history is not available from this node".to_string(),
data: None,
},
RpcCustomError::ScanError { message } => Self {
code: ErrorCode::ServerError(JSON_RPC_SCAN_ERROR),
message,
data: None,
},
}
}
}

View File

@ -17,7 +17,12 @@ use solana_ledger::{
};
use solana_measure::measure::Measure;
use solana_runtime::{bank::Bank, bank_forks::BankForks, contains::Contains};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use solana_sdk::{
clock::{BankId, Slot},
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::timestamp,
};
use std::{
collections::{HashMap, HashSet},
iter::Iterator,
@ -559,7 +564,7 @@ impl RepairService {
#[allow(dead_code)]
fn process_new_duplicate_slots(
new_duplicate_slots: &[Slot],
new_duplicate_slots: &[(Slot, BankId)],
duplicate_slot_repair_statuses: &mut HashMap<Slot, DuplicateSlotRepairStatus>,
cluster_slots: &ClusterSlots,
root_bank: &Bank,
@ -568,7 +573,7 @@ impl RepairService {
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
repair_validators: &Option<HashSet<Pubkey>>,
) {
for slot in new_duplicate_slots {
for (slot, bank_id) in new_duplicate_slots {
warn!(
"Cluster confirmed slot: {}, dumping our current version and repairing",
slot
@ -577,7 +582,7 @@ impl RepairService {
root_bank.clear_slot_signatures(*slot);
// Clear the accounts for this slot
root_bank.remove_unrooted_slots(&[*slot]);
root_bank.remove_unrooted_slots(&[(*slot, *bank_id)]);
// Clear the slot-related data in blockstore. This will:
// 1) Clear old shreds allowing new ones to be inserted
@ -1139,6 +1144,7 @@ mod test {
);
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot);
let duplicate_bank_id = bank9.bank_id();
let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey());
bank9
.transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey())
@ -1156,7 +1162,7 @@ mod test {
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some());
RepairService::process_new_duplicate_slots(
&[duplicate_slot],
&[(duplicate_slot, duplicate_bank_id)],
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&bank9,

View File

@ -850,12 +850,6 @@ impl ReplayStage {
// Clear the duplicate banks from BankForks
{
let mut w_bank_forks = bank_forks.write().unwrap();
// Purging should have already been taken care of by logic
// in repair_service, so make sure drop implementation doesn't
// run
if let Some(b) = w_bank_forks.get(*d) {
b.skip_drop.store(true, Ordering::Relaxed)
}
w_bank_forks.remove(*d);
}
}

View File

@ -2041,6 +2041,7 @@ fn main() {
if remove_stake_accounts {
for (address, mut account) in bank
.get_program_accounts(&solana_stake_program::id())
.unwrap()
.into_iter()
{
account.set_lamports(0);
@ -2074,6 +2075,7 @@ fn main() {
// Delete existing vote accounts
for (address, mut account) in bank
.get_program_accounts(&solana_vote_program::id())
.unwrap()
.into_iter()
{
account.set_lamports(0);
@ -2235,6 +2237,7 @@ fn main() {
let accounts: BTreeMap<_, _> = bank
.get_all_accounts_with_modified_slots()
.unwrap()
.into_iter()
.filter(|(pubkey, _account, _slot)| {
include_sysvars || !solana_sdk::sysvar::is_sysvar_id(pubkey)

View File

@ -92,6 +92,8 @@ use {
tokio::runtime::Runtime,
};
type RpcCustomResult<T> = std::result::Result<T, RpcCustomError>;
pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB
pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720;
@ -705,18 +707,23 @@ impl JsonRpcRequestProcessor {
fn get_largest_accounts(
&self,
config: Option<RpcLargestAccountsConfig>,
) -> RpcResponse<Vec<RpcAccountBalance>> {
) -> RpcCustomResult<RpcResponse<Vec<RpcAccountBalance>>> {
let config = config.unwrap_or_default();
let bank = self.bank(config.commitment);
if let Some((slot, accounts)) = self.get_cached_largest_accounts(&config.filter) {
Response {
Ok(Response {
context: RpcResponseContext { slot },
value: accounts,
}
})
} else {
let (addresses, address_filter) = if let Some(filter) = config.clone().filter {
let non_circulating_supply = calculate_non_circulating_supply(&bank);
let non_circulating_supply =
calculate_non_circulating_supply(&bank).map_err(|e| {
RpcCustomError::ScanError {
message: e.to_string(),
}
})?;
let addresses = non_circulating_supply.accounts.into_iter().collect();
let address_filter = match filter {
RpcLargestAccountsFilter::Circulating => AccountAddressFilter::Exclude,
@ -728,6 +735,9 @@ impl JsonRpcRequestProcessor {
};
let accounts = bank
.get_largest_accounts(NUM_LARGEST_ACCOUNTS, &addresses, address_filter)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?
.into_iter()
.map(|(address, lamports)| RpcAccountBalance {
address: address.to_string(),
@ -736,15 +746,21 @@ impl JsonRpcRequestProcessor {
.collect::<Vec<RpcAccountBalance>>();
self.set_cached_largest_accounts(&config.filter, bank.slot(), &accounts);
new_response(&bank, accounts)
Ok(new_response(&bank, accounts))
}
}
fn get_supply(&self, commitment: Option<CommitmentConfig>) -> RpcResponse<RpcSupply> {
fn get_supply(
&self,
commitment: Option<CommitmentConfig>,
) -> RpcCustomResult<RpcResponse<RpcSupply>> {
let bank = self.bank(commitment);
let non_circulating_supply = calculate_non_circulating_supply(&bank);
let non_circulating_supply =
calculate_non_circulating_supply(&bank).map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?;
let total_supply = bank.capitalization();
new_response(
Ok(new_response(
&bank,
RpcSupply {
total: total_supply,
@ -756,7 +772,7 @@ impl JsonRpcRequestProcessor {
.map(|pubkey| pubkey.to_string())
.collect(),
},
)
))
}
fn get_vote_accounts(
@ -1738,7 +1754,7 @@ impl JsonRpcRequestProcessor {
bank: &Arc<Bank>,
program_id: &Pubkey,
filters: Vec<RpcFilterType>,
) -> Result<Vec<(Pubkey, AccountSharedData)>> {
) -> RpcCustomResult<Vec<(Pubkey, AccountSharedData)>> {
let filter_closure = |account: &AccountSharedData| {
filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data().len() as u64 == *size,
@ -1753,21 +1769,26 @@ impl JsonRpcRequestProcessor {
if !self.config.account_indexes.include_key(program_id) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: program_id.to_string(),
}
.into());
});
}
Ok(
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(*program_id), |account| {
Ok(bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(*program_id), |account| {
// The program-id account index checks for Account owner on inclusion. However, due
// to the current AccountsDb implementation, an account may remain in storage as a
// zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later
// updates. We include the redundant filters here to avoid returning these
// accounts.
account.owner() == program_id && filter_closure(account)
}),
)
})
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
Ok(bank.get_filtered_program_accounts(program_id, filter_closure))
Ok(bank
.get_filtered_program_accounts(program_id, filter_closure)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
}
}
@ -1777,7 +1798,7 @@ impl JsonRpcRequestProcessor {
bank: &Arc<Bank>,
owner_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> Result<Vec<(Pubkey, AccountSharedData)>> {
) -> RpcCustomResult<Vec<(Pubkey, AccountSharedData)>> {
// The by-owner accounts index checks for Token Account state and Owner address on
// inclusion. However, due to the current AccountsDb implementation, an account may remain
// in storage as a zero-lamport AccountSharedData::Default() after being wiped and reinitialized in
@ -1802,19 +1823,19 @@ impl JsonRpcRequestProcessor {
if !self.config.account_indexes.include_key(owner_key) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: owner_key.to_string(),
}
.into());
});
}
Ok(bank.get_filtered_indexed_accounts(
&IndexKey::SplTokenOwner(*owner_key),
|account| {
Ok(bank
.get_filtered_indexed_accounts(&IndexKey::SplTokenOwner(*owner_key), |account| {
account.owner() == &spl_token_id_v2_0()
&& filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data().len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data()),
})
},
))
})
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters)
}
@ -1826,7 +1847,7 @@ impl JsonRpcRequestProcessor {
bank: &Arc<Bank>,
mint_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> Result<Vec<(Pubkey, AccountSharedData)>> {
) -> RpcCustomResult<Vec<(Pubkey, AccountSharedData)>> {
// The by-mint accounts index checks for Token Account state and Mint address on inclusion.
// However, due to the current AccountsDb implementation, an account may remain in storage
// as be zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later
@ -1850,18 +1871,19 @@ impl JsonRpcRequestProcessor {
if !self.config.account_indexes.include_key(mint_key) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: mint_key.to_string(),
}
.into());
});
}
Ok(
bank.get_filtered_indexed_accounts(&IndexKey::SplTokenMint(*mint_key), |account| {
Ok(bank
.get_filtered_indexed_accounts(&IndexKey::SplTokenMint(*mint_key), |account| {
account.owner() == &spl_token_id_v2_0()
&& filters.iter().all(|filter_type| match filter_type {
RpcFilterType::DataSize(size) => account.data().len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data()),
})
}),
)
})
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters)
}
@ -2872,7 +2894,7 @@ pub mod rpc_full {
config: Option<RpcLargestAccountsConfig>,
) -> Result<RpcResponse<Vec<RpcAccountBalance>>> {
debug!("get_largest_accounts rpc request received");
Ok(meta.get_largest_accounts(config))
Ok(meta.get_largest_accounts(config)?)
}
fn get_supply(
@ -2881,7 +2903,7 @@ pub mod rpc_full {
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<RpcSupply>> {
debug!("get_supply rpc request received");
Ok(meta.get_supply(commitment))
Ok(meta.get_supply(commitment)?)
}
fn request_airdrop(

View File

@ -246,6 +246,7 @@ fn process_rest(bank_forks: &Arc<RwLock<BankForks>>, path: &str) -> Option<Strin
let total_supply = bank.capitalization();
let non_circulating_supply =
solana_runtime::non_circulating_supply::calculate_non_circulating_supply(&bank)
.expect("Scan should not error on root banks")
.lamports;
Some(format!(
"{}",

View File

@ -260,7 +260,13 @@ fn bench_concurrent_read_write(bencher: &mut Bencher) {
fn bench_concurrent_scan_write(bencher: &mut Bencher) {
store_accounts_with_possible_contention("concurrent_scan_write", bencher, |accounts, _| loop {
test::black_box(
accounts.load_by_program(&Ancestors::default(), AccountSharedData::default().owner()),
accounts
.load_by_program(
&Ancestors::default(),
0,
AccountSharedData::default().owner(),
)
.unwrap(),
);
})
}
@ -389,9 +395,11 @@ fn bench_load_largest_accounts(b: &mut Bencher) {
accounts.store_slow_uncached(0, &pubkey, &account);
}
let ancestors = Ancestors::from(vec![0]);
let bank_id = 0;
b.iter(|| {
accounts.load_largest_accounts(
&ancestors,
bank_id,
20,
&HashSet::new(),
AccountAddressFilter::Exclude,

View File

@ -3,7 +3,7 @@ use crate::{
AccountShrinkThreshold, AccountsDb, BankHashInfo, ErrorCounters, LoadHint, LoadedAccount,
ScanStorageResult,
},
accounts_index::{AccountSecondaryIndexes, IndexKey},
accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult},
ancestors::Ancestors,
bank::{
NonceRollbackFull, NonceRollbackInfo, RentDebits, TransactionCheckResult,
@ -23,7 +23,7 @@ use solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount, WritableAccount},
account_utils::StateMut,
bpf_loader_upgradeable::{self, UpgradeableLoaderState},
clock::{Slot, INITIAL_RENT_EPOCH},
clock::{BankId, Slot, INITIAL_RENT_EPOCH},
feature_set::{self, FeatureSet},
fee_calculator::{FeeCalculator, FeeConfig},
genesis_config::ClusterType,
@ -585,15 +585,17 @@ impl Accounts {
pub fn load_largest_accounts(
&self,
ancestors: &Ancestors,
bank_id: BankId,
num: usize,
filter_by_address: &HashSet<Pubkey>,
filter: AccountAddressFilter,
) -> Vec<(Pubkey, u64)> {
) -> ScanResult<Vec<(Pubkey, u64)>> {
if num == 0 {
return vec![];
return Ok(vec![]);
}
let account_balances = self.accounts_db.scan_accounts(
ancestors,
bank_id,
|collector: &mut BinaryHeap<Reverse<(u64, Pubkey)>>, option| {
if let Some((pubkey, account, _slot)) = option {
if account.lamports() == 0 {
@ -619,12 +621,12 @@ impl Accounts {
collector.push(Reverse((account.lamports(), *pubkey)));
}
},
);
account_balances
)?;
Ok(account_balances
.into_sorted_vec()
.into_iter()
.map(|Reverse((balance, pubkey))| (pubkey, balance))
.collect()
.collect())
}
pub fn calculate_capitalization(
@ -687,10 +689,12 @@ impl Accounts {
pub fn load_by_program(
&self,
ancestors: &Ancestors,
bank_id: BankId,
program_id: &Pubkey,
) -> Vec<(Pubkey, AccountSharedData)> {
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
self.accounts_db.scan_accounts(
ancestors,
bank_id,
|collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| {
Self::load_while_filtering(collector, some_account_tuple, |account| {
account.owner() == program_id
@ -702,11 +706,13 @@ impl Accounts {
pub fn load_by_program_with_filter<F: Fn(&AccountSharedData) -> bool>(
&self,
ancestors: &Ancestors,
bank_id: BankId,
program_id: &Pubkey,
filter: F,
) -> Vec<(Pubkey, AccountSharedData)> {
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
self.accounts_db.scan_accounts(
ancestors,
bank_id,
|collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| {
Self::load_while_filtering(collector, some_account_tuple, |account| {
account.owner() == program_id && filter(account)
@ -718,12 +724,14 @@ impl Accounts {
pub fn load_by_index_key_with_filter<F: Fn(&AccountSharedData) -> bool>(
&self,
ancestors: &Ancestors,
bank_id: BankId,
index_key: &IndexKey,
filter: F,
) -> Vec<(Pubkey, AccountSharedData)> {
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
self.accounts_db
.index_scan_accounts(
ancestors,
bank_id,
*index_key,
|collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| {
Self::load_while_filtering(collector, some_account_tuple, |account| {
@ -731,16 +739,21 @@ impl Accounts {
})
},
)
.0
.map(|result| result.0)
}
pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool {
self.accounts_db.account_indexes.include_key(key)
}
pub fn load_all(&self, ancestors: &Ancestors) -> Vec<(Pubkey, AccountSharedData, Slot)> {
pub fn load_all(
&self,
ancestors: &Ancestors,
bank_id: BankId,
) -> ScanResult<Vec<(Pubkey, AccountSharedData, Slot)>> {
self.accounts_db.scan_accounts(
ancestors,
bank_id,
|collector: &mut Vec<(Pubkey, AccountSharedData, Slot)>, some_account_tuple| {
if let Some((pubkey, account, slot)) = some_account_tuple
.filter(|(_, account, _)| Self::is_loadable(account.lamports()))
@ -931,8 +944,8 @@ impl Accounts {
/// Purge a slot if it is not a root
/// Root slots cannot be purged
/// `is_from_abs` is true if the caller is the AccountsBackgroundService
pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) {
self.accounts_db.purge_slot(slot, is_from_abs);
pub fn purge_slot(&self, slot: Slot, bank_id: BankId, is_from_abs: bool) {
self.accounts_db.purge_slot(slot, bank_id, is_from_abs);
}
/// Add a slot to root. Root slots cannot be purged
@ -2580,108 +2593,160 @@ mod tests {
let all_pubkeys: HashSet<_> = vec![pubkey0, pubkey1, pubkey2].into_iter().collect();
// num == 0 should always return empty set
let bank_id = 0;
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
0,
&HashSet::new(),
AccountAddressFilter::Exclude
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
0,
&HashSet::new(),
AccountAddressFilter::Exclude
)
.unwrap(),
vec![]
);
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
0,
&all_pubkeys,
AccountAddressFilter::Include
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
0,
&all_pubkeys,
AccountAddressFilter::Include
)
.unwrap(),
vec![]
);
// list should be sorted by balance, then pubkey, descending
assert!(pubkey1 > pubkey0);
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
1,
&HashSet::new(),
AccountAddressFilter::Exclude
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
1,
&HashSet::new(),
AccountAddressFilter::Exclude
)
.unwrap(),
vec![(pubkey1, 42)]
);
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
2,
&HashSet::new(),
AccountAddressFilter::Exclude
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
2,
&HashSet::new(),
AccountAddressFilter::Exclude
)
.unwrap(),
vec![(pubkey1, 42), (pubkey0, 42)]
);
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
3,
&HashSet::new(),
AccountAddressFilter::Exclude
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
3,
&HashSet::new(),
AccountAddressFilter::Exclude
)
.unwrap(),
vec![(pubkey1, 42), (pubkey0, 42), (pubkey2, 41)]
);
// larger num should not affect results
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
6,
&HashSet::new(),
AccountAddressFilter::Exclude
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
6,
&HashSet::new(),
AccountAddressFilter::Exclude
)
.unwrap(),
vec![(pubkey1, 42), (pubkey0, 42), (pubkey2, 41)]
);
// AccountAddressFilter::Exclude should exclude entry
let exclude1: HashSet<_> = vec![pubkey1].into_iter().collect();
assert_eq!(
accounts.load_largest_accounts(&ancestors, 1, &exclude1, AccountAddressFilter::Exclude),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
1,
&exclude1,
AccountAddressFilter::Exclude
)
.unwrap(),
vec![(pubkey0, 42)]
);
assert_eq!(
accounts.load_largest_accounts(&ancestors, 2, &exclude1, AccountAddressFilter::Exclude),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
2,
&exclude1,
AccountAddressFilter::Exclude
)
.unwrap(),
vec![(pubkey0, 42), (pubkey2, 41)]
);
assert_eq!(
accounts.load_largest_accounts(&ancestors, 3, &exclude1, AccountAddressFilter::Exclude),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
3,
&exclude1,
AccountAddressFilter::Exclude
)
.unwrap(),
vec![(pubkey0, 42), (pubkey2, 41)]
);
// AccountAddressFilter::Include should limit entries
let include1_2: HashSet<_> = vec![pubkey1, pubkey2].into_iter().collect();
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
1,
&include1_2,
AccountAddressFilter::Include
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
1,
&include1_2,
AccountAddressFilter::Include
)
.unwrap(),
vec![(pubkey1, 42)]
);
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
2,
&include1_2,
AccountAddressFilter::Include
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
2,
&include1_2,
AccountAddressFilter::Include
)
.unwrap(),
vec![(pubkey1, 42), (pubkey2, 41)]
);
assert_eq!(
accounts.load_largest_accounts(
&ancestors,
3,
&include1_2,
AccountAddressFilter::Include
),
accounts
.load_largest_accounts(
&ancestors,
bank_id,
3,
&include1_2,
AccountAddressFilter::Include
)
.unwrap(),
vec![(pubkey1, 42), (pubkey2, 41)]
);
}

View File

@ -12,7 +12,10 @@ use crossbeam_channel::{Receiver, SendError, Sender};
use log::*;
use rand::{thread_rng, Rng};
use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, hash::Hash};
use solana_sdk::{
clock::{BankId, Slot},
hash::Hash,
};
use std::{
boxed::Box,
fmt::{Debug, Formatter},
@ -39,8 +42,8 @@ const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATI
pub type SnapshotRequestSender = Sender<SnapshotRequest>;
pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
pub type DroppedSlotsSender = Sender<Slot>;
pub type DroppedSlotsReceiver = Receiver<Slot>;
pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
#[derive(Clone)]
pub struct SendDroppedBankCallback {
@ -49,7 +52,7 @@ pub struct SendDroppedBankCallback {
impl DropCallback for SendDroppedBankCallback {
fn callback(&self, bank: &Bank) {
if let Err(e) = self.sender.send(bank.slot()) {
if let Err(e) = self.sender.send((bank.slot(), bank.bank_id())) {
warn!("Error sending dropped banks: {:?}", e);
}
}
@ -273,9 +276,11 @@ impl AbsRequestHandler {
/// `is_from_abs` is true if the caller is the AccountsBackgroundService
pub fn handle_pruned_banks(&self, bank: &Bank, is_from_abs: bool) -> usize {
let mut count = 0;
for pruned_slot in self.pruned_banks_receiver.try_iter() {
for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
count += 1;
bank.rc.accounts.purge_slot(pruned_slot, is_from_abs);
bank.rc
.accounts
.purge_slot(pruned_slot, pruned_bank_id, is_from_abs);
}
count
@ -465,7 +470,7 @@ mod test {
&AccountSharedData::new(264, 0, &Pubkey::default()),
);
assert!(bank0.get_account(&account_key).is_some());
pruned_banks_sender.send(0).unwrap();
pruned_banks_sender.send((0, 0)).unwrap();
assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());

View File

@ -24,7 +24,7 @@ use crate::{
accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats, PreviousPass},
accounts_index::{
AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexRootsStats,
IndexKey, IsCached, SlotList, SlotSlice, ZeroLamport,
IndexKey, IsCached, ScanResult, SlotList, SlotSlice, ZeroLamport,
},
ancestors::Ancestors,
append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion},
@ -48,7 +48,7 @@ use solana_measure::measure::Measure;
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::{Epoch, Slot},
clock::{BankId, Epoch, Slot},
genesis_config::ClusterType,
hash::{Hash, Hasher},
pubkey::Pubkey,
@ -1506,7 +1506,7 @@ impl AccountsDb {
};
if no_delete {
let mut pending_store_ids: HashSet<usize> = HashSet::new();
for (_slot_id, account_info) in account_infos {
for (_bank_id, account_info) in account_infos {
if !already_counted.contains(&account_info.store_id) {
pending_store_ids.insert(account_info.store_id);
}
@ -2420,21 +2420,29 @@ impl AccountsDb {
}
}
pub fn scan_accounts<F, A>(&self, ancestors: &Ancestors, scan_func: F) -> A
pub fn scan_accounts<F, A>(
&self,
ancestors: &Ancestors,
bank_id: BankId,
scan_func: F,
) -> ScanResult<A>
where
F: Fn(&mut A, Option<(&Pubkey, AccountSharedData, Slot)>),
A: Default,
{
let mut collector = A::default();
// This can error out if the slots being scanned over are aborted
self.accounts_index
.scan_accounts(ancestors, |pubkey, (account_info, slot)| {
.scan_accounts(ancestors, bank_id, |pubkey, (account_info, slot)| {
let account_slot = self
.get_account_accessor(slot, pubkey, account_info.store_id, account_info.offset)
.get_loaded_account()
.map(|loaded_account| (pubkey, loaded_account.take_account(), slot));
scan_func(&mut collector, account_slot)
});
collector
})?;
Ok(collector)
}
pub fn unchecked_scan_accounts<F, A>(
@ -2504,9 +2512,10 @@ impl AccountsDb {
pub fn index_scan_accounts<F, A>(
&self,
ancestors: &Ancestors,
bank_id: BankId,
index_key: IndexKey,
scan_func: F,
) -> (A, bool)
) -> ScanResult<(A, bool)>
where
F: Fn(&mut A, Option<(&Pubkey, AccountSharedData, Slot)>),
A: Default,
@ -2519,12 +2528,14 @@ impl AccountsDb {
if !self.account_indexes.include_key(key) {
// the requested key was not indexed in the secondary index, so do a normal scan
let used_index = false;
return (self.scan_accounts(ancestors, scan_func), used_index);
let scan_result = self.scan_accounts(ancestors, bank_id, scan_func)?;
return Ok((scan_result, used_index));
}
let mut collector = A::default();
self.accounts_index.index_scan_accounts(
ancestors,
bank_id,
index_key,
|pubkey, (account_info, slot)| {
let account_slot = self
@ -2533,9 +2544,9 @@ impl AccountsDb {
.map(|loaded_account| (pubkey, loaded_account.take_account(), slot));
scan_func(&mut collector, account_slot)
},
);
)?;
let used_index = true;
(collector, used_index)
Ok((collector, used_index))
}
/// Scan a specific slot through all the account storage in parallel
@ -3284,14 +3295,29 @@ impl AccountsDb {
SendDroppedBankCallback::new(pruned_banks_sender)
}
/// This should only be called after the `Bank::drop()` runs in bank.rs, See BANK_DROP_SAFETY
/// comment below for more explanation.
/// `is_from_abs` is true if the caller is the AccountsBackgroundService
pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) {
pub fn purge_slot(&self, slot: Slot, bank_id: BankId, is_from_abs: bool) {
if self.is_bank_drop_callback_enabled.load(Ordering::SeqCst) && !is_from_abs {
panic!("bad drop callpath detected; Bank::drop() must run serially with other logic in ABS like clean_accounts()")
}
let mut slots = HashSet::new();
slots.insert(slot);
self.purge_slots(&slots);
// BANK_DROP_SAFETY: Because this function only runs once the bank is dropped,
// we know that there are no longer any ongoing scans on this bank, because scans require
// and hold a reference to the bank at the tip of the fork they're scanning. Hence it's
// safe to remove this bank_id from the `removed_bank_ids` list at this point.
if self
.accounts_index
.removed_bank_ids
.lock()
.unwrap()
.remove(&bank_id)
{
// If this slot was already cleaned up, no need to do any further cleans
return;
}
self.purge_slots(std::iter::once(&slot));
}
fn recycle_slot_stores(
@ -3325,7 +3351,7 @@ impl AccountsDb {
/// Purges every slot in `removed_slots` from both the cache and storage. This includes
/// entries in the accounts index, cache entries, and any backing storage entries.
fn purge_slots_from_cache_and_store<'a>(
&'a self,
&self,
removed_slots: impl Iterator<Item = &'a Slot>,
purge_stats: &PurgeStats,
) {
@ -3336,7 +3362,11 @@ impl AccountsDb {
// This function is only currently safe with respect to `flush_slot_cache()` because
// both functions run serially in AccountsBackgroundService.
let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed");
if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) {
// Note: we cannot remove this slot from the slot cache until we've removed its
// entries from the accounts index first. This is because `scan_accounts()` relies on
// holding the index lock, finding the index entry, and then looking up the entry
// in the cache. If it fails to find that entry, it will panic in `get_loaded_account()`
if let Some(slot_cache) = self.accounts_cache.slot_cache(*remove_slot) {
// If the slot is still in the cache, remove the backing storages for
// the slot and from the Accounts Index
num_cached_slots_removed += 1;
@ -3344,6 +3374,8 @@ impl AccountsDb {
self.purge_slot_cache(*remove_slot, slot_cache);
remove_cache_elapsed.stop();
remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us();
// Nobody else shoud have removed the slot cache entry yet
assert!(self.accounts_cache.remove_slot(*remove_slot).is_some());
} else {
self.purge_slot_storage(*remove_slot, purge_stats);
}
@ -3531,39 +3563,33 @@ impl AccountsDb {
}
#[allow(clippy::needless_collect)]
fn purge_slots(&self, slots: &HashSet<Slot>) {
fn purge_slots<'a>(&self, slots: impl Iterator<Item = &'a Slot>) {
// `add_root()` should be called first
let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
let non_roots: Vec<&Slot> = slots
.iter()
.filter(|slot| !self.accounts_index.is_root(**slot))
.collect();
let non_roots = slots
// Only safe to check when there are duplciate versions of a slot
// because ReplayStage will not make new roots before dumping the
// duplicate slots first. Thus we will not be in a case where we
// root slot `S`, then try to dump some other version of slot `S`, the
// dumping has to finish first
//
// Also note roots are never removed via `remove_unrooted_slot()`, so
// it's safe to filter them out here as they won't need deletion from
// self.accounts_index.removed_bank_ids in `purge_slots_from_cache_and_store()`.
.filter(|slot| !self.accounts_index.is_root(**slot));
safety_checks_elapsed.stop();
self.external_purge_slots_stats
.safety_checks_elapsed
.fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
self.purge_slots_from_cache_and_store(
non_roots.into_iter(),
&self.external_purge_slots_stats,
);
self.purge_slots_from_cache_and_store(non_roots, &self.external_purge_slots_stats);
self.external_purge_slots_stats
.report("external_purge_slots_stats", Some(1000));
}
// TODO: This is currently unsafe with scan because it can remove a slot in the middle
/// Remove the set of slots in `remove_slots` from both the cache and storage. This requires
/// we know the contents of the slot are either:
///
/// 1) Completely in the cache
/// 2) Have been completely flushed from the cache
///
/// in order to guarantee that when this function returns, the contents of the slot have
/// been completely and not only partially removed. Thus synchronization with `flush_slot_cache()`
/// through `self.remove_unrooted_slots_synchronization` is necessary.
pub fn remove_unrooted_slots(&self, remove_slots: &[Slot]) {
pub fn remove_unrooted_slots(&self, remove_slots: &[(Slot, BankId)]) {
let rooted_slots = self
.accounts_index
.get_rooted_from_list(remove_slots.iter());
.get_rooted_from_list(remove_slots.iter().map(|(slot, _)| slot));
assert!(
rooted_slots.is_empty(),
"Trying to remove accounts for rooted slots {:?}",
@ -3583,7 +3609,7 @@ impl AccountsDb {
// we want to remove in this function
let mut remaining_contended_flush_slots: Vec<Slot> = remove_slots
.iter()
.filter(|remove_slot| {
.filter_map(|(remove_slot, _)| {
let is_being_flushed = currently_contended_slots.contains(remove_slot);
if !is_being_flushed {
// Reserve the slots that we want to purge that aren't currently
@ -3594,10 +3620,10 @@ impl AccountsDb {
// before another version of the same slot can be replayed. This means
// multiple threads should not call `remove_unrooted_slots()` simultaneously
// with the same slot.
currently_contended_slots.insert(**remove_slot);
currently_contended_slots.insert(*remove_slot);
}
// If the cache is currently flushing this slot, add it to the list
is_being_flushed
Some(remove_slot).filter(|_| is_being_flushed)
})
.cloned()
.collect();
@ -3630,13 +3656,26 @@ impl AccountsDb {
}
}
// Mark down these slots are about to be purged so that new attempts to scan these
// banks fail, and any ongoing scans over these slots will detect that they should abort
// their results
{
let mut locked_removed_bank_ids = self.accounts_index.removed_bank_ids.lock().unwrap();
for (_slot, remove_bank_id) in remove_slots.iter() {
locked_removed_bank_ids.insert(*remove_bank_id);
}
}
let remove_unrooted_purge_stats = PurgeStats::default();
self.purge_slots_from_cache_and_store(remove_slots.iter(), &remove_unrooted_purge_stats);
self.purge_slots_from_cache_and_store(
remove_slots.iter().map(|(slot, _)| slot),
&remove_unrooted_purge_stats,
);
remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0));
let mut currently_contended_slots = slots_under_contention.lock().unwrap();
for slot in remove_slots {
assert!(currently_contended_slots.remove(slot));
for (remove_slot, _) in remove_slots {
assert!(currently_contended_slots.remove(remove_slot));
}
}
@ -6945,6 +6984,7 @@ pub mod tests {
fn run_test_remove_unrooted_slot(is_cached: bool) {
let unrooted_slot = 9;
let unrooted_bank_id = 9;
let mut db = AccountsDb::new(Vec::new(), &ClusterType::Development);
db.caching_enabled = true;
let key = Pubkey::default();
@ -6966,7 +7006,7 @@ pub mod tests {
assert_load_account(&db, unrooted_slot, key, 1);
// Purge the slot
db.remove_unrooted_slots(&[unrooted_slot]);
db.remove_unrooted_slots(&[(unrooted_slot, unrooted_bank_id)]);
assert!(db.load_without_fixed_root(&ancestors, &key).is_none());
assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none());
assert!(db.accounts_cache.slot_cache(unrooted_slot).is_none());
@ -6997,13 +7037,14 @@ pub mod tests {
fn test_remove_unrooted_slot_snapshot() {
solana_logger::setup();
let unrooted_slot = 9;
let unrooted_bank_id = 9;
let db = AccountsDb::new(Vec::new(), &ClusterType::Development);
let key = solana_sdk::pubkey::new_rand();
let account0 = AccountSharedData::new(1, 0, &key);
db.store_uncached(unrooted_slot, &[(&key, &account0)]);
// Purge the slot
db.remove_unrooted_slots(&[unrooted_slot]);
db.remove_unrooted_slots(&[(unrooted_slot, unrooted_bank_id)]);
// Add a new root
let key2 = solana_sdk::pubkey::new_rand();
@ -7628,11 +7669,13 @@ pub mod tests {
// Secondary index should still find both pubkeys
let mut found_accounts = HashSet::new();
let index_key = IndexKey::SplTokenMint(mint_key);
let bank_id = 0;
accounts
.accounts_index
.index_scan_accounts(&Ancestors::default(), index_key, |key, _| {
.index_scan_accounts(&Ancestors::default(), bank_id, index_key, |key, _| {
found_accounts.insert(*key);
});
})
.unwrap();
assert_eq!(found_accounts.len(), 2);
assert!(found_accounts.contains(&pubkey1));
assert!(found_accounts.contains(&pubkey2));
@ -7643,13 +7686,16 @@ pub mod tests {
keys: [mint_key].iter().cloned().collect::<HashSet<Pubkey>>(),
});
// Secondary index can't be used - do normal scan: should still find both pubkeys
let found_accounts = accounts.index_scan_accounts(
&Ancestors::default(),
index_key,
|collection: &mut HashSet<Pubkey>, account| {
collection.insert(*account.unwrap().0);
},
);
let found_accounts = accounts
.index_scan_accounts(
&Ancestors::default(),
bank_id,
index_key,
|collection: &mut HashSet<Pubkey>, account| {
collection.insert(*account.unwrap().0);
},
)
.unwrap();
assert!(!found_accounts.1);
assert_eq!(found_accounts.0.len(), 2);
assert!(found_accounts.0.contains(&pubkey1));
@ -7658,13 +7704,16 @@ pub mod tests {
accounts.account_indexes.keys = None;
// Secondary index can now be used since it isn't marked as excluded
let found_accounts = accounts.index_scan_accounts(
&Ancestors::default(),
index_key,
|collection: &mut HashSet<Pubkey>, account| {
collection.insert(*account.unwrap().0);
},
);
let found_accounts = accounts
.index_scan_accounts(
&Ancestors::default(),
bank_id,
index_key,
|collection: &mut HashSet<Pubkey>, account| {
collection.insert(*account.unwrap().0);
},
)
.unwrap();
assert!(found_accounts.1);
assert_eq!(found_accounts.0.len(), 2);
assert!(found_accounts.0.contains(&pubkey1));
@ -7689,11 +7738,15 @@ pub mod tests {
// Secondary index should have purged `pubkey1` as well
let mut found_accounts = vec![];
accounts.accounts_index.index_scan_accounts(
&Ancestors::default(),
IndexKey::SplTokenMint(mint_key),
|key, _| found_accounts.push(*key),
);
accounts
.accounts_index
.index_scan_accounts(
&Ancestors::default(),
bank_id,
IndexKey::SplTokenMint(mint_key),
|key, _| found_accounts.push(*key),
)
.unwrap();
assert_eq!(found_accounts, vec![pubkey2]);
}
@ -10167,6 +10220,7 @@ pub mod tests {
fn setup_scan(
db: Arc<AccountsDb>,
scan_ancestors: Arc<Ancestors>,
bank_id: BankId,
stall_key: Pubkey,
) -> ScanTracker {
let exit = Arc::new(AtomicBool::new(false));
@ -10179,6 +10233,7 @@ pub mod tests {
.spawn(move || {
db.scan_accounts(
&scan_ancestors,
bank_id,
|_collector: &mut Vec<(Pubkey, AccountSharedData)>, maybe_account| {
ready_.store(true, Ordering::Relaxed);
if let Some((pubkey, _, _)) = maybe_account {
@ -10193,7 +10248,8 @@ pub mod tests {
}
}
},
);
)
.unwrap();
})
.unwrap();
@ -10239,7 +10295,8 @@ pub mod tests {
let max_scan_root = 0;
db.add_root(max_scan_root);
let scan_ancestors: Arc<Ancestors> = Arc::new(vec![(0, 1), (1, 1)].into_iter().collect());
let scan_tracker = setup_scan(db.clone(), scan_ancestors.clone(), account_key2);
let bank_id = 0;
let scan_tracker = setup_scan(db.clone(), scan_ancestors.clone(), bank_id, account_key2);
// Add a new root 2
let new_root = 2;
@ -10299,7 +10356,8 @@ pub mod tests {
assert_eq!(account.0.lamports(), slot1_account.lamports());
// Simulate dropping the bank, which finally removes the slot from the cache
db.purge_slot(1, false);
let bank_id = 1;
db.purge_slot(1, bank_id, false);
assert!(db
.do_load(
&scan_ancestors,
@ -10404,7 +10462,13 @@ pub mod tests {
accounts_db.add_root(*slot as Slot);
if Some(*slot) == scan_slot {
let ancestors = Arc::new(vec![(stall_slot, 1), (*slot, 1)].into_iter().collect());
scan_tracker = Some(setup_scan(accounts_db.clone(), ancestors, scan_stall_key));
let bank_id = 0;
scan_tracker = Some(setup_scan(
accounts_db.clone(),
ancestors,
bank_id,
scan_stall_key,
));
assert_eq!(
accounts_db.accounts_index.min_ongoing_scan_root().unwrap(),
*slot
@ -11237,7 +11301,12 @@ pub mod tests {
account.set_lamports(lamports);
// Pick random 50% of the slots to pass to `remove_unrooted_slots()`
let mut all_slots: Vec<Slot> = (0..num_cached_slots).collect();
let mut all_slots: Vec<(Slot, BankId)> = (0..num_cached_slots)
.map(|slot| {
let bank_id = slot + 1;
(slot, bank_id)
})
.collect();
all_slots.shuffle(&mut rand::thread_rng());
let slots_to_dump = &all_slots[0..num_cached_slots as usize / 2];
let slots_to_keep = &all_slots[num_cached_slots as usize / 2..];
@ -11270,7 +11339,7 @@ pub mod tests {
// Check that all the slots in `slots_to_dump` were completely removed from the
// cache, storage, and index
for slot in slots_to_dump {
for (slot, _) in slots_to_dump {
assert!(db.storage.get_slot_storage_entries(*slot).is_none());
assert!(db.accounts_cache.slot_cache(*slot).is_none());
let account_in_slot = slot_to_pubkey_map[slot];
@ -11283,7 +11352,7 @@ pub mod tests {
// Wait for flush to finish before starting next trial
flush_done_receiver.recv().unwrap();
for slot in slots_to_keep {
for (slot, bank_id) in slots_to_keep {
let account_in_slot = slot_to_pubkey_map[slot];
assert!(db
.load(
@ -11294,7 +11363,7 @@ pub mod tests {
.is_some());
// Clear for next iteration so that `assert!(self.storage.get_slot_stores(purged_slot).is_none());`
// in `purge_slot_pubkeys()` doesn't trigger
db.remove_unrooted_slots(&[*slot]);
db.remove_unrooted_slots(&[(*slot, *bank_id)]);
}
}

View File

@ -10,7 +10,7 @@ use log::*;
use ouroboros::self_referencing;
use solana_measure::measure::Measure;
use solana_sdk::{
clock::Slot,
clock::{BankId, Slot},
pubkey::{Pubkey, PUBKEY_BYTES},
};
use std::{
@ -25,15 +25,16 @@ use std::{
},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock, RwLockReadGuard, RwLockWriteGuard,
Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
},
};
use thiserror::Error;
pub const ITER_BATCH_SIZE: usize = 1000;
pub type ScanResult<T> = Result<T, ScanError>;
pub type SlotList<T> = Vec<(Slot, T)>;
pub type SlotSlice<'s, T> = &'s [(Slot, T)];
pub type RefCount = u64;
pub type AccountMap<K, V> = BTreeMap<K, V>;
@ -55,6 +56,12 @@ impl IsCached for u64 {
}
}
#[derive(Error, Debug, PartialEq)]
pub enum ScanError {
#[error("Node detected it replayed bad version of slot {slot:?} with id {bank_id:?}, thus the scan on said slot was aborted")]
SlotRemoved { slot: Slot, bank_id: BankId },
}
enum ScanTypes<R: RangeBounds<Pubkey>> {
Unindexed(Option<R>),
Indexed(IndexKey),
@ -458,6 +465,10 @@ impl RollingBitField {
std::mem::swap(&mut n, self);
}
pub fn max(&self) -> u64 {
self.max
}
pub fn get_all(&self) -> Vec<u64> {
let mut all = Vec::with_capacity(self.count);
self.excess.iter().for_each(|slot| all.push(*slot));
@ -584,6 +595,22 @@ type MapType<T> = AccountMap<Pubkey, AccountMapEntry<T>>;
type AccountMapsWriteLock<'a, T> = RwLockWriteGuard<'a, AccountMap<Pubkey, AccountMapEntry<T>>>;
type AccountMapsReadLock<'a, T> = RwLockReadGuard<'a, AccountMap<Pubkey, AccountMapEntry<T>>>;
#[derive(Debug, Default)]
pub struct ScanSlotTracker {
is_removed: bool,
ref_count: u64,
}
impl ScanSlotTracker {
pub fn is_removed(&self) -> bool {
self.is_removed
}
pub fn mark_removed(&mut self) {
self.is_removed = true;
}
}
#[derive(Debug)]
pub struct AccountsIndex<T> {
pub account_maps: RwLock<MapType<T>>,
@ -592,6 +619,17 @@ pub struct AccountsIndex<T> {
spl_token_owner_index: SecondaryIndex<RwLockSecondaryIndexEntry>,
roots_tracker: RwLock<RootsTracker>,
ongoing_scan_roots: RwLock<BTreeMap<Slot, u64>>,
// Each scan has some latest slot `S` that is the tip of the fork the scan
// is iterating over. The unique id of that slot `S` is recorded here (note we don't use
// `S` as the id because there can be more than one version of a slot `S`). If a fork
// is abandoned, all of the slots on that fork up to `S` will be removed via
// `AccountsDb::remove_unrooted_slots()`. When the scan finishes, it'll realize that the
// results of the scan may have been corrupted by `remove_unrooted_slots` and abort its results.
//
// `removed_bank_ids` tracks all the slot ids that were removed via `remove_unrooted_slots()` so any attempted scans
// on any of these slots fails. This is safe to purge once the associated Bank is dropped and
// scanning the fork with that Bank at the tip is no longer possible.
pub removed_bank_ids: Mutex<HashSet<BankId>>,
zero_lamport_pubkeys: DashSet<Pubkey>,
}
@ -610,6 +648,7 @@ impl<T> Default for AccountsIndex<T> {
),
roots_tracker: RwLock::<RootsTracker>::default(),
ongoing_scan_roots: RwLock::<BTreeMap<Slot, u64>>::default(),
removed_bank_ids: Mutex::<HashSet<BankId>>::default(),
zero_lamport_pubkeys: DashSet::<Pubkey>::default(),
}
}
@ -627,12 +666,24 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
&self,
metric_name: &'static str,
ancestors: &Ancestors,
scan_bank_id: BankId,
func: F,
scan_type: ScanTypes<R>,
) where
) -> Result<(), ScanError>
where
F: FnMut(&Pubkey, (&T, Slot)),
R: RangeBounds<Pubkey>,
{
{
let locked_removed_bank_ids = self.removed_bank_ids.lock().unwrap();
if locked_removed_bank_ids.contains(&scan_bank_id) {
return Err(ScanError::SlotRemoved {
slot: ancestors.max_slot(),
bank_id: scan_bank_id,
});
}
}
let max_root = {
let mut w_ongoing_scan_roots = self
// This lock is also grabbed by clean_accounts(), so clean
@ -809,6 +860,23 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
ongoing_scan_roots.remove(&max_root);
}
}
// If the fork with tip at bank `scan_bank_id` was removed during our scan, then the scan
// may have been corrupted, so abort the results.
let was_scan_corrupted = self
.removed_bank_ids
.lock()
.unwrap()
.contains(&scan_bank_id);
if was_scan_corrupted {
Err(ScanError::SlotRemoved {
slot: ancestors.max_slot(),
bank_id: scan_bank_id,
})
} else {
Ok(())
}
}
fn do_unchecked_scan_accounts<F, R>(
@ -1008,7 +1076,12 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
}
/// call func with every pubkey and index visible from a given set of ancestors
pub(crate) fn scan_accounts<F>(&self, ancestors: &Ancestors, func: F)
pub(crate) fn scan_accounts<F>(
&self,
ancestors: &Ancestors,
scan_bank_id: BankId,
func: F,
) -> Result<(), ScanError>
where
F: FnMut(&Pubkey, (&T, Slot)),
{
@ -1016,9 +1089,10 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
self.do_checked_scan_accounts(
"",
ancestors,
scan_bank_id,
func,
ScanTypes::Unindexed(None::<Range<Pubkey>>),
);
)
}
pub(crate) fn unchecked_scan_accounts<F>(
@ -1048,7 +1122,13 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
}
/// call func with every pubkey and index visible from a given set of ancestors
pub(crate) fn index_scan_accounts<F>(&self, ancestors: &Ancestors, index_key: IndexKey, func: F)
pub(crate) fn index_scan_accounts<F>(
&self,
ancestors: &Ancestors,
scan_bank_id: BankId,
index_key: IndexKey,
func: F,
) -> Result<(), ScanError>
where
F: FnMut(&Pubkey, (&T, Slot)),
{
@ -1056,9 +1136,10 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
self.do_checked_scan_accounts(
"",
ancestors,
scan_bank_id,
func,
ScanTypes::<Range<Pubkey>>::Indexed(index_key),
);
)
}
pub fn get_rooted_entries(&self, slice: SlotSlice<T>, max: Option<Slot>) -> SlotList<T> {

View File

@ -75,6 +75,10 @@ impl Ancestors {
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn max_slot(&self) -> Slot {
self.ancestors.max() - 1
}
}
#[cfg(test)]
pub mod tests {

View File

@ -39,7 +39,7 @@ use crate::{
TransactionLoadResult, TransactionLoaders,
},
accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages},
accounts_index::{AccountSecondaryIndexes, IndexKey},
accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult},
ancestors::{Ancestors, AncestorsForSerialization},
blockhash_queue::BlockhashQueue,
builtins::{self, ActivationType},
@ -68,7 +68,7 @@ use solana_sdk::{
AccountSharedData, InheritableAccountFields, ReadableAccount, WritableAccount,
},
clock::{
Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_TICKS_PER_SECOND,
BankId, Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_TICKS_PER_SECOND,
INITIAL_RENT_EPOCH, MAX_PROCESSING_AGE, MAX_RECENT_BLOCKHASHES,
MAX_TRANSACTION_FORWARDING_DELAY, SECONDS_PER_DAY,
},
@ -416,6 +416,8 @@ pub struct BankRc {
/// Current slot
pub(crate) slot: Slot,
pub(crate) bank_id_generator: Arc<AtomicU64>,
}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
@ -430,6 +432,7 @@ impl AbiExample for BankRc {
// AbiExample for Accounts is specially implemented to contain a storage example
accounts: AbiExample::example(),
slot: AbiExample::example(),
bank_id_generator: Arc::new(AtomicU64::new(0)),
}
}
}
@ -440,6 +443,7 @@ impl BankRc {
accounts: Arc::new(accounts),
parent: RwLock::new(None),
slot,
bank_id_generator: Arc::new(AtomicU64::new(0)),
}
}
@ -909,6 +913,8 @@ pub struct Bank {
/// Bank slot (i.e. block)
slot: Slot,
bank_id: BankId,
/// Bank epoch
epoch: Epoch,
@ -965,8 +971,6 @@ pub struct Bank {
/// Protocol-level rewards that were distributed by this bank
pub rewards: RwLock<Vec<(Pubkey, RewardInfo)>>,
pub skip_drop: AtomicBool,
pub cluster_type: Option<ClusterType>,
pub lazy_rent_collection: AtomicBool,
@ -1131,6 +1135,7 @@ impl Bank {
)),
parent: RwLock::new(Some(parent.clone())),
slot,
bank_id_generator: parent.rc.bank_id_generator.clone(),
};
let src = StatusCacheRc {
status_cache: parent.src.status_cache.clone(),
@ -1139,10 +1144,12 @@ impl Bank {
let fee_rate_governor =
FeeRateGovernor::new_derived(&parent.fee_rate_governor, parent.signature_count());
let bank_id = rc.bank_id_generator.fetch_add(1, Relaxed) + 1;
let mut new = Bank {
rc,
src,
slot,
bank_id,
epoch,
blockhash_queue: RwLock::new(parent.blockhash_queue.read().unwrap().clone()),
@ -1184,7 +1191,6 @@ impl Bank {
hard_forks: parent.hard_forks.clone(),
last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Relaxed)),
rewards: RwLock::new(vec![]),
skip_drop: AtomicBool::new(false),
cluster_type: parent.cluster_type,
lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)),
no_stake_rewrite: AtomicBool::new(parent.no_stake_rewrite.load(Relaxed)),
@ -1322,6 +1328,7 @@ impl Bank {
slots_per_year: fields.slots_per_year,
unused: genesis_config.unused,
slot: fields.slot,
bank_id: 0,
epoch: fields.epoch,
block_height: fields.block_height,
collector_id: fields.collector_id,
@ -1341,7 +1348,6 @@ impl Bank {
feature_builtins: new(),
last_vote_sync: new(),
rewards: new(),
skip_drop: new(),
cluster_type: Some(genesis_config.cluster_type),
lazy_rent_collection: new(),
no_stake_rewrite: new(),
@ -1441,6 +1447,10 @@ impl Bank {
self.slot
}
pub fn bank_id(&self) -> BankId {
self.bank_id
}
pub fn epoch(&self) -> Epoch {
self.epoch
}
@ -2658,7 +2668,7 @@ impl Bank {
}
}
pub fn remove_unrooted_slots(&self, slots: &[Slot]) {
pub fn remove_unrooted_slots(&self, slots: &[(Slot, BankId)]) {
self.rc.accounts.accounts_db.remove_unrooted_slots(slots)
}
@ -4335,38 +4345,49 @@ impl Bank {
.map(|(acc, _slot)| acc)
}
pub fn get_program_accounts(&self, program_id: &Pubkey) -> Vec<(Pubkey, AccountSharedData)> {
pub fn get_program_accounts(
&self,
program_id: &Pubkey,
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
self.rc
.accounts
.load_by_program(&self.ancestors, program_id)
.load_by_program(&self.ancestors, self.bank_id, program_id)
}
pub fn get_filtered_program_accounts<F: Fn(&AccountSharedData) -> bool>(
&self,
program_id: &Pubkey,
filter: F,
) -> Vec<(Pubkey, AccountSharedData)> {
self.rc
.accounts
.load_by_program_with_filter(&self.ancestors, program_id, filter)
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
self.rc.accounts.load_by_program_with_filter(
&self.ancestors,
self.bank_id,
program_id,
filter,
)
}
pub fn get_filtered_indexed_accounts<F: Fn(&AccountSharedData) -> bool>(
&self,
index_key: &IndexKey,
filter: F,
) -> Vec<(Pubkey, AccountSharedData)> {
self.rc
.accounts
.load_by_index_key_with_filter(&self.ancestors, index_key, filter)
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
self.rc.accounts.load_by_index_key_with_filter(
&self.ancestors,
self.bank_id,
index_key,
filter,
)
}
pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool {
self.rc.accounts.account_indexes_include_key(key)
}
pub fn get_all_accounts_with_modified_slots(&self) -> Vec<(Pubkey, AccountSharedData, Slot)> {
self.rc.accounts.load_all(&self.ancestors)
pub fn get_all_accounts_with_modified_slots(
&self,
) -> ScanResult<Vec<(Pubkey, AccountSharedData, Slot)>> {
self.rc.accounts.load_all(&self.ancestors, self.bank_id)
}
pub fn get_program_accounts_modified_since_parent(
@ -4421,10 +4442,14 @@ impl Bank {
num: usize,
filter_by_address: &HashSet<Pubkey>,
filter: AccountAddressFilter,
) -> Vec<(Pubkey, u64)> {
self.rc
.accounts
.load_largest_accounts(&self.ancestors, num, filter_by_address, filter)
) -> ScanResult<Vec<(Pubkey, u64)>> {
self.rc.accounts.load_largest_accounts(
&self.ancestors,
self.bank_id,
num,
filter_by_address,
filter,
)
}
pub fn transaction_count(&self) -> u64 {
@ -5255,10 +5280,6 @@ impl Bank {
impl Drop for Bank {
fn drop(&mut self) {
if self.skip_drop.load(Relaxed) {
return;
}
if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() {
drop_callback.callback(self);
} else {
@ -5266,7 +5287,9 @@ impl Drop for Bank {
// 1. Tests
// 2. At startup when replaying blockstore and there's no
// AccountsBackgroundService to perform cleanups yet.
self.rc.accounts.purge_slot(self.slot(), false);
self.rc
.accounts
.purge_slot(self.slot(), self.bank_id(), false);
}
}
}
@ -5307,7 +5330,9 @@ pub(crate) mod tests {
use crate::{
accounts_background_service::{AbsRequestHandler, SendDroppedBankCallback},
accounts_db::DEFAULT_ACCOUNTS_SHRINK_RATIO,
accounts_index::{AccountIndex, AccountMap, AccountSecondaryIndexes, ITER_BATCH_SIZE},
accounts_index::{
AccountIndex, AccountMap, AccountSecondaryIndexes, ScanError, ITER_BATCH_SIZE,
},
ancestors::Ancestors,
genesis_utils::{
activate_all_features, bootstrap_validator_stake_lamports,
@ -9145,7 +9170,7 @@ pub(crate) mod tests {
let parent = Arc::new(Bank::new(&genesis_config));
parent.restore_old_behavior_for_fragile_tests();
let genesis_accounts: Vec<_> = parent.get_all_accounts_with_modified_slots();
let genesis_accounts: Vec<_> = parent.get_all_accounts_with_modified_slots().unwrap();
assert!(
genesis_accounts
.iter()
@ -9173,11 +9198,11 @@ pub(crate) mod tests {
let bank1 = Arc::new(new_from_parent(&bank0));
bank1.squash();
assert_eq!(
bank0.get_program_accounts(&program_id),
bank0.get_program_accounts(&program_id).unwrap(),
vec![(pubkey0, account0.clone())]
);
assert_eq!(
bank1.get_program_accounts(&program_id),
bank1.get_program_accounts(&program_id).unwrap(),
vec![(pubkey0, account0)]
);
assert_eq!(
@ -9196,8 +9221,8 @@ pub(crate) mod tests {
let bank3 = Arc::new(new_from_parent(&bank2));
bank3.squash();
assert_eq!(bank1.get_program_accounts(&program_id).len(), 2);
assert_eq!(bank3.get_program_accounts(&program_id).len(), 2);
assert_eq!(bank1.get_program_accounts(&program_id).unwrap().len(), 2);
assert_eq!(bank3.get_program_accounts(&program_id).unwrap().len(), 2);
}
#[test]
@ -9217,8 +9242,9 @@ pub(crate) mod tests {
let account = AccountSharedData::new(1, 0, &program_id);
bank.store_account(&address, &account);
let indexed_accounts =
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true);
let indexed_accounts = bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true)
.unwrap();
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, account));
@ -9229,12 +9255,14 @@ pub(crate) mod tests {
let new_account = AccountSharedData::new(1, 0, &another_program_id);
let bank = Arc::new(new_from_parent(&bank));
bank.store_account(&address, &new_account);
let indexed_accounts =
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true);
let indexed_accounts = bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true)
.unwrap();
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, new_account.clone()));
let indexed_accounts =
bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |_| true);
let indexed_accounts = bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |_| true)
.unwrap();
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, new_account.clone()));
@ -9242,12 +9270,14 @@ pub(crate) mod tests {
let indexed_accounts = bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |account| {
account.owner() == &program_id
});
})
.unwrap();
assert!(indexed_accounts.is_empty());
let indexed_accounts = bank
.get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |account| {
account.owner() == &another_program_id
});
})
.unwrap();
assert_eq!(indexed_accounts.len(), 1);
assert_eq!(indexed_accounts[0], (address, new_account));
}
@ -11895,13 +11925,27 @@ pub(crate) mod tests {
assert!(!debug.is_empty());
}
#[derive(Debug)]
enum AcceptableScanResults {
DroppedSlotError,
NoFailure,
Both,
}
fn test_store_scan_consistency<F: 'static>(
accounts_db_caching_enabled: bool,
update_f: F,
drop_callback: Option<Box<dyn DropCallback + Send + Sync>>,
acceptable_scan_results: AcceptableScanResults,
) where
F: Fn(Arc<Bank>, crossbeam_channel::Sender<Arc<Bank>>, Arc<HashSet<Pubkey>>, Pubkey, u64)
+ std::marker::Send,
F: Fn(
Arc<Bank>,
crossbeam_channel::Sender<Arc<Bank>>,
crossbeam_channel::Receiver<BankId>,
Arc<HashSet<Pubkey>>,
Pubkey,
u64,
) + std::marker::Send,
{
// Set up initial bank
let mut genesis_config = create_genesis_config_with_leader(
@ -11946,48 +11990,83 @@ pub(crate) mod tests {
// Thread that runs scan and constantly checks for
// consistency
let pubkeys_to_modify_ = pubkeys_to_modify.clone();
let exit_ = exit.clone();
// Channel over which the bank to scan is sent
let (bank_to_scan_sender, bank_to_scan_receiver): (
crossbeam_channel::Sender<Arc<Bank>>,
crossbeam_channel::Receiver<Arc<Bank>>,
) = bounded(1);
let scan_thread = Builder::new()
.name("scan".to_string())
.spawn(move || loop {
if exit_.load(Relaxed) {
return;
}
if let Ok(bank_to_scan) =
bank_to_scan_receiver.recv_timeout(Duration::from_millis(10))
{
let accounts = bank_to_scan.get_program_accounts(&program_id);
// Should never see empty accounts because no slot ever deleted
// any of the original accounts, and the scan should reflect the
// account state at some frozen slot `X` (no partial updates).
assert!(!accounts.is_empty());
let mut expected_lamports = None;
let mut target_accounts_found = HashSet::new();
for (pubkey, account) in accounts {
let account_balance = account.lamports();
if pubkeys_to_modify_.contains(&pubkey) {
target_accounts_found.insert(pubkey);
if let Some(expected_lamports) = expected_lamports {
assert_eq!(account_balance, expected_lamports);
} else {
// All pubkeys in the specified set should have the same balance
expected_lamports = Some(account_balance);
let (scan_finished_sender, scan_finished_receiver): (
crossbeam_channel::Sender<BankId>,
crossbeam_channel::Receiver<BankId>,
) = unbounded();
let num_banks_scanned = Arc::new(AtomicU64::new(0));
let scan_thread = {
let exit = exit.clone();
let num_banks_scanned = num_banks_scanned.clone();
Builder::new()
.name("scan".to_string())
.spawn(move || {
loop {
info!("starting scan iteration");
if exit.load(Relaxed) {
info!("scan exiting");
return;
}
if let Ok(bank_to_scan) =
bank_to_scan_receiver.recv_timeout(Duration::from_millis(10))
{
info!("scanning program accounts for slot {}", bank_to_scan.slot());
let accounts_result = bank_to_scan.get_program_accounts(&program_id);
let _ = scan_finished_sender.send(bank_to_scan.bank_id());
num_banks_scanned.fetch_add(1, Relaxed);
match (&acceptable_scan_results, accounts_result.is_err()) {
(AcceptableScanResults::DroppedSlotError, _)
| (AcceptableScanResults::Both, true) => {
assert_eq!(
accounts_result,
Err(ScanError::SlotRemoved {
slot: bank_to_scan.slot(),
bank_id: bank_to_scan.bank_id()
})
);
}
(AcceptableScanResults::NoFailure, _)
| (AcceptableScanResults::Both, false) => {
assert!(accounts_result.is_ok())
}
}
// Should never see empty accounts because no slot ever deleted
// any of the original accounts, and the scan should reflect the
// account state at some frozen slot `X` (no partial updates).
if let Ok(accounts) = accounts_result {
assert!(!accounts.is_empty());
let mut expected_lamports = None;
let mut target_accounts_found = HashSet::new();
for (pubkey, account) in accounts {
let account_balance = account.lamports();
if pubkeys_to_modify_.contains(&pubkey) {
target_accounts_found.insert(pubkey);
if let Some(expected_lamports) = expected_lamports {
assert_eq!(account_balance, expected_lamports);
} else {
// All pubkeys in the specified set should have the same balance
expected_lamports = Some(account_balance);
}
}
}
// Should've found all the accounts, i.e. no partial cleans should
// be detected
assert_eq!(target_accounts_found.len(), total_pubkeys_to_modify);
}
}
}
// Should've found all the accounts, i.e. no partial cleans should
// be detected
assert_eq!(target_accounts_found.len(), total_pubkeys_to_modify);
}
})
.unwrap();
})
.unwrap()
};
// Thread that constantly updates the accounts, sets
// roots, and cleans
@ -11997,6 +12076,7 @@ pub(crate) mod tests {
update_f(
bank0,
bank_to_scan_sender,
scan_finished_receiver,
pubkeys_to_modify,
program_id,
starting_lamports,
@ -12005,7 +12085,15 @@ pub(crate) mod tests {
.unwrap();
// Let threads run for a while, check the scans didn't see any mixed slots
let min_expected_number_of_scans = 5;
std::thread::sleep(Duration::new(5, 0));
loop {
if num_banks_scanned.load(Relaxed) > min_expected_number_of_scans {
break;
} else {
std::thread::sleep(Duration::from_millis(100));
}
}
exit.store(true, Relaxed);
scan_thread.join().unwrap();
update_thread.join().unwrap();
@ -12023,6 +12111,7 @@ pub(crate) mod tests {
*accounts_db_caching_enabled,
move |bank0,
bank_to_scan_sender,
_scan_finished_receiver,
pubkeys_to_modify,
program_id,
starting_lamports| {
@ -12107,6 +12196,7 @@ pub(crate) mod tests {
Some(Box::new(SendDroppedBankCallback::new(
pruned_banks_sender.clone(),
))),
AcceptableScanResults::NoFailure,
)
}
}
@ -12116,7 +12206,12 @@ pub(crate) mod tests {
for accounts_db_caching_enabled in &[false, true] {
test_store_scan_consistency(
*accounts_db_caching_enabled,
|bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| {
|bank0,
bank_to_scan_sender,
_scan_finished_receiver,
pubkeys_to_modify,
program_id,
starting_lamports| {
let mut current_bank = bank0.clone();
let mut prev_bank = bank0;
loop {
@ -12151,6 +12246,242 @@ pub(crate) mod tests {
}
},
None,
AcceptableScanResults::NoFailure,
);
}
}
fn setup_banks_on_fork_to_remove(
bank0: Arc<Bank>,
pubkeys_to_modify: Arc<HashSet<Pubkey>>,
program_id: &Pubkey,
starting_lamports: u64,
num_banks_on_fork: usize,
step_size: usize,
) -> (Arc<Bank>, Vec<(Slot, BankId)>, Ancestors) {
// Need at least 2 keys to create inconsistency in account balances when deleting
// slots
assert!(pubkeys_to_modify.len() > 1);
// Tracks the bank at the tip of the to be created fork
let mut bank_at_fork_tip = bank0;
// All the slots on the fork except slot 0
let mut slots_on_fork = Vec::with_capacity(num_banks_on_fork);
// All accounts in each set of `step_size` slots will have the same account balances.
// The account balances of the accounts changes every `step_size` banks. Thus if you
// delete any one of the latest `step_size` slots, then you will see varying account
// balances when loading the accounts.
assert!(num_banks_on_fork >= 2);
assert!(step_size >= 2);
let pubkeys_to_modify: Vec<Pubkey> = pubkeys_to_modify.iter().cloned().collect();
let pubkeys_to_modify_per_slot = (pubkeys_to_modify.len() / step_size).max(1);
for _ in (0..num_banks_on_fork).step_by(step_size) {
let mut lamports_this_round = 0;
for i in 0..step_size {
bank_at_fork_tip = Arc::new(Bank::new_from_parent(
&bank_at_fork_tip,
&solana_sdk::pubkey::new_rand(),
bank_at_fork_tip.slot() + 1,
));
if lamports_this_round == 0 {
lamports_this_round = bank_at_fork_tip.bank_id() + starting_lamports + 1;
}
let pubkey_to_modify_starting_index = i * pubkeys_to_modify_per_slot;
let account = AccountSharedData::new(lamports_this_round, 0, program_id);
for pubkey_index_to_modify in pubkey_to_modify_starting_index
..pubkey_to_modify_starting_index + pubkeys_to_modify_per_slot
{
let key = pubkeys_to_modify[pubkey_index_to_modify % pubkeys_to_modify.len()];
bank_at_fork_tip.store_account(&key, &account);
}
bank_at_fork_tip.freeze();
slots_on_fork.push((bank_at_fork_tip.slot(), bank_at_fork_tip.bank_id()));
}
}
let ancestors: Vec<(Slot, usize)> = slots_on_fork.iter().map(|(s, _)| (*s, 0)).collect();
let ancestors = Ancestors::from(ancestors);
(bank_at_fork_tip, slots_on_fork, ancestors)
}
#[test]
fn test_remove_unrooted_before_scan() {
for accounts_db_caching_enabled in &[false, true] {
test_store_scan_consistency(
*accounts_db_caching_enabled,
|bank0,
bank_to_scan_sender,
scan_finished_receiver,
pubkeys_to_modify,
program_id,
starting_lamports| {
loop {
let (bank_at_fork_tip, slots_on_fork, ancestors) =
setup_banks_on_fork_to_remove(
bank0.clone(),
pubkeys_to_modify.clone(),
&program_id,
starting_lamports,
10,
2,
);
// Test removing the slot before the scan starts, should cause
// SlotRemoved error every time
for k in pubkeys_to_modify.iter() {
assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_some());
}
bank_at_fork_tip.remove_unrooted_slots(&slots_on_fork);
// Accounts on this fork should not be found after removal
for k in pubkeys_to_modify.iter() {
assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_none());
}
if bank_to_scan_sender.send(bank_at_fork_tip.clone()).is_err() {
return;
}
// Wait for scan to finish before starting next iteration
let finished_scan_bank_id = scan_finished_receiver.recv();
if finished_scan_bank_id.is_err() {
return;
}
assert_eq!(finished_scan_bank_id.unwrap(), bank_at_fork_tip.bank_id());
}
},
None,
// Test removing the slot before the scan starts, should error every time
AcceptableScanResults::DroppedSlotError,
);
}
}
#[test]
fn test_remove_unrooted_scan_then_recreate_same_slot_before_scan() {
for accounts_db_caching_enabled in &[false, true] {
test_store_scan_consistency(
*accounts_db_caching_enabled,
|bank0,
bank_to_scan_sender,
scan_finished_receiver,
pubkeys_to_modify,
program_id,
starting_lamports| {
let mut prev_bank = bank0.clone();
loop {
let start = Instant::now();
let (bank_at_fork_tip, slots_on_fork, ancestors) =
setup_banks_on_fork_to_remove(
bank0.clone(),
pubkeys_to_modify.clone(),
&program_id,
starting_lamports,
10,
2,
);
info!("setting up banks elapsed: {}", start.elapsed().as_millis());
// Remove the fork. Then we'll recreate the slots and only after we've
// recreated the slots, do we send this old bank for scanning.
// Skip scanning bank 0 on first iteration of loop, since those accounts
// aren't being removed
if prev_bank.slot() != 0 {
info!(
"sending bank with slot: {:?}, elapsed: {}",
prev_bank.slot(),
start.elapsed().as_millis()
);
// Although we dumped the slots last iteration via `remove_unrooted_slots()`,
// we've recreated those slots this iteration, so they should be findable
// again
for k in pubkeys_to_modify.iter() {
assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_some());
}
// Now after we've recreated the slots removed in the previous loop
// iteration, send the previous bank, should fail even though the
// same slots were recreated
if bank_to_scan_sender.send(prev_bank.clone()).is_err() {
return;
}
let finished_scan_bank_id = scan_finished_receiver.recv();
if finished_scan_bank_id.is_err() {
return;
}
// Wait for scan to finish before starting next iteration
assert_eq!(finished_scan_bank_id.unwrap(), prev_bank.bank_id());
}
bank_at_fork_tip.remove_unrooted_slots(&slots_on_fork);
prev_bank = bank_at_fork_tip;
}
},
None,
// Test removing the slot before the scan starts, should error every time
AcceptableScanResults::DroppedSlotError,
);
}
}
#[test]
fn test_remove_unrooted_scan_interleaved_with_remove_unrooted_slots() {
for accounts_db_caching_enabled in &[false, true] {
test_store_scan_consistency(
*accounts_db_caching_enabled,
|bank0,
bank_to_scan_sender,
scan_finished_receiver,
pubkeys_to_modify,
program_id,
starting_lamports| {
loop {
let step_size = 2;
let (bank_at_fork_tip, slots_on_fork, ancestors) =
setup_banks_on_fork_to_remove(
bank0.clone(),
pubkeys_to_modify.clone(),
&program_id,
starting_lamports,
10,
step_size,
);
// Although we dumped the slots last iteration via `remove_unrooted_slots()`,
// we've recreated those slots this iteration, so they should be findable
// again
for k in pubkeys_to_modify.iter() {
assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_some());
}
// Now after we've recreated the slots removed in the previous loop
// iteration, send the previous bank, should fail even though the
// same slots were recreated
if bank_to_scan_sender.send(bank_at_fork_tip.clone()).is_err() {
return;
}
// Remove 1 < `step_size` of the *latest* slots while the scan is happening.
// This should create inconsistency between the account balances of accounts
// stored in that slot, and the accounts stored in earlier slots
let slot_to_remove = *slots_on_fork.last().unwrap();
bank_at_fork_tip.remove_unrooted_slots(&[slot_to_remove]);
// Wait for scan to finish before starting next iteration
let finished_scan_bank_id = scan_finished_receiver.recv();
if finished_scan_bank_id.is_err() {
return;
}
assert_eq!(finished_scan_bank_id.unwrap(), bank_at_fork_tip.bank_id());
// Remove the rest of the slots before the next iteration
for (slot, bank_id) in slots_on_fork {
bank_at_fork_tip.remove_unrooted_slots(&[(slot, bank_id)]);
}
}
},
None,
// Test removing the slot before the scan starts, should error every time
AcceptableScanResults::Both,
);
}
}
@ -12664,21 +12995,25 @@ pub(crate) mod tests {
// Return only one largest account
assert_eq!(
bank.get_largest_accounts(1, &pubkeys_hashset, AccountAddressFilter::Include),
bank.get_largest_accounts(1, &pubkeys_hashset, AccountAddressFilter::Include)
.unwrap(),
vec![(pubkeys[4], sol_to_lamports(5.0))]
);
assert_eq!(
bank.get_largest_accounts(1, &HashSet::new(), AccountAddressFilter::Exclude),
bank.get_largest_accounts(1, &HashSet::new(), AccountAddressFilter::Exclude)
.unwrap(),
vec![(pubkeys[4], sol_to_lamports(5.0))]
);
assert_eq!(
bank.get_largest_accounts(1, &exclude4, AccountAddressFilter::Exclude),
bank.get_largest_accounts(1, &exclude4, AccountAddressFilter::Exclude)
.unwrap(),
vec![(pubkeys[3], sol_to_lamports(4.0))]
);
// Return all added accounts
let results =
bank.get_largest_accounts(10, &pubkeys_hashset, AccountAddressFilter::Include);
let results = bank
.get_largest_accounts(10, &pubkeys_hashset, AccountAddressFilter::Include)
.unwrap();
assert_eq!(results.len(), sorted_accounts.len());
for pubkey_balance in sorted_accounts.iter() {
assert!(results.contains(pubkey_balance));
@ -12688,7 +13023,9 @@ pub(crate) mod tests {
assert_eq!(sorted_results, results);
let expected_accounts = sorted_accounts[1..].to_vec();
let results = bank.get_largest_accounts(10, &exclude4, AccountAddressFilter::Exclude);
let results = bank
.get_largest_accounts(10, &exclude4, AccountAddressFilter::Exclude)
.unwrap();
// results include 5 Bank builtins
assert_eq!(results.len(), 10);
for pubkey_balance in expected_accounts.iter() {
@ -12700,14 +13037,18 @@ pub(crate) mod tests {
// Return 3 added accounts
let expected_accounts = sorted_accounts[0..4].to_vec();
let results = bank.get_largest_accounts(4, &pubkeys_hashset, AccountAddressFilter::Include);
let results = bank
.get_largest_accounts(4, &pubkeys_hashset, AccountAddressFilter::Include)
.unwrap();
assert_eq!(results.len(), expected_accounts.len());
for pubkey_balance in expected_accounts.iter() {
assert!(results.contains(pubkey_balance));
}
let expected_accounts = expected_accounts[1..4].to_vec();
let results = bank.get_largest_accounts(3, &exclude4, AccountAddressFilter::Exclude);
let results = bank
.get_largest_accounts(3, &exclude4, AccountAddressFilter::Exclude)
.unwrap();
assert_eq!(results.len(), expected_accounts.len());
for pubkey_balance in expected_accounts.iter() {
assert!(results.contains(pubkey_balance));
@ -12719,7 +13060,8 @@ pub(crate) mod tests {
.cloned()
.collect();
assert_eq!(
bank.get_largest_accounts(2, &exclude, AccountAddressFilter::Exclude),
bank.get_largest_accounts(2, &exclude, AccountAddressFilter::Exclude)
.unwrap(),
vec![pubkeys_balances[3], pubkeys_balances[1]]
);
}

View File

@ -1,6 +1,6 @@
use {
crate::{
accounts_index::{AccountIndex, IndexKey},
accounts_index::{AccountIndex, IndexKey, ScanResult},
bank::Bank,
},
log::*,
@ -14,7 +14,7 @@ pub struct NonCirculatingSupply {
pub accounts: Vec<Pubkey>,
}
pub fn calculate_non_circulating_supply(bank: &Arc<Bank>) -> NonCirculatingSupply {
pub fn calculate_non_circulating_supply(bank: &Arc<Bank>) -> ScanResult<NonCirculatingSupply> {
debug!("Updating Bank supply, epoch: {}", bank.epoch());
let mut non_circulating_accounts_set: HashSet<Pubkey> = HashSet::new();
@ -38,10 +38,11 @@ pub fn calculate_non_circulating_supply(bank: &Arc<Bank>) -> NonCirculatingSuppl
// zero-lamport Account::Default() after being wiped and reinitialized in later
// updates. We include the redundant filter here to avoid returning these accounts.
|account| account.owner() == &solana_stake_program::id(),
)
)?
} else {
bank.get_program_accounts(&solana_stake_program::id())
bank.get_program_accounts(&solana_stake_program::id())?
};
for (pubkey, account) in stake_accounts.iter() {
let stake_account = StakeState::from(account).unwrap_or_default();
match stake_account {
@ -68,10 +69,10 @@ pub fn calculate_non_circulating_supply(bank: &Arc<Bank>) -> NonCirculatingSuppl
.map(|pubkey| bank.get_balance(&pubkey))
.sum();
NonCirculatingSupply {
Ok(NonCirculatingSupply {
lamports,
accounts: non_circulating_accounts_set.into_iter().collect(),
}
})
}
// Mainnet-beta accounts that should be considered non-circulating
@ -256,7 +257,7 @@ mod tests {
+ sysvar_and_native_program_delta,
);
let non_circulating_supply = calculate_non_circulating_supply(&bank);
let non_circulating_supply = calculate_non_circulating_supply(&bank).unwrap();
assert_eq!(
non_circulating_supply.lamports,
(num_non_circulating_accounts + num_stake_accounts) * balance
@ -274,7 +275,7 @@ mod tests {
&AccountSharedData::new(new_balance, 0, &Pubkey::default()),
);
}
let non_circulating_supply = calculate_non_circulating_supply(&bank);
let non_circulating_supply = calculate_non_circulating_supply(&bank).unwrap();
assert_eq!(
non_circulating_supply.lamports,
(num_non_circulating_accounts * new_balance) + (num_stake_accounts * balance)
@ -289,7 +290,7 @@ mod tests {
bank = Arc::new(new_from_parent(&bank));
}
assert_eq!(bank.epoch(), 1);
let non_circulating_supply = calculate_non_circulating_supply(&bank);
let non_circulating_supply = calculate_non_circulating_supply(&bank).unwrap();
assert_eq!(
non_circulating_supply.lamports,
num_non_circulating_accounts * new_balance

View File

@ -77,6 +77,10 @@ pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 6;
/// is some some number of Ticks long.
pub type Slot = u64;
/// Uniquely distinguishes every version of a slot, even if the
/// slot number is the same, i.e. duplicate slots
pub type BankId = u64;
/// Epoch is a unit of time a given leader schedule is honored,
/// some number of Slots.
pub type Epoch = u64;