diff --git a/client/src/rpc_custom_error.rs b/client/src/rpc_custom_error.rs index 400947dcd6..3e80c1a522 100644 --- a/client/src/rpc_custom_error.rs +++ b/client/src/rpc_custom_error.rs @@ -1,5 +1,5 @@ //! Implementation defined RPC server errors - +use thiserror::Error; use { crate::rpc_response::RpcSimulateTransactionResult, jsonrpc_core::{Error, ErrorCode}, @@ -17,35 +17,40 @@ pub const JSON_RPC_SERVER_ERROR_NO_SNAPSHOT: i64 = -32008; pub const JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_SLOT_SKIPPED: i64 = -32009; pub const JSON_RPC_SERVER_ERROR_KEY_EXCLUDED_FROM_SECONDARY_INDEX: i64 = -32010; pub const JSON_RPC_SERVER_ERROR_TRANSACTION_HISTORY_NOT_AVAILABLE: i64 = -32011; +pub const JSON_RPC_SCAN_ERROR: i64 = -32012; +#[derive(Error, Debug)] pub enum RpcCustomError { + #[error("BlockCleanedUp")] BlockCleanedUp { slot: Slot, first_available_block: Slot, }, + #[error("SendTransactionPreflightFailure")] SendTransactionPreflightFailure { message: String, result: RpcSimulateTransactionResult, }, + #[error("TransactionSignatureVerificationFailure")] TransactionSignatureVerificationFailure, - BlockNotAvailable { - slot: Slot, - }, - NodeUnhealthy { - num_slots_behind: Option, - }, + #[error("BlockNotAvailable")] + BlockNotAvailable { slot: Slot }, + #[error("NodeUnhealthy")] + NodeUnhealthy { num_slots_behind: Option }, + #[error("TransactionPrecompileVerificationFailure")] TransactionPrecompileVerificationFailure(solana_sdk::transaction::TransactionError), - SlotSkipped { - slot: Slot, - }, + #[error("SlotSkipped")] + SlotSkipped { slot: Slot }, + #[error("NoSnapshot")] NoSnapshot, - LongTermStorageSlotSkipped { - slot: Slot, - }, - KeyExcludedFromSecondaryIndex { - index_key: String, - }, + #[error("LongTermStorageSlotSkipped")] + LongTermStorageSlotSkipped { slot: Slot }, + #[error("KeyExcludedFromSecondaryIndex")] + KeyExcludedFromSecondaryIndex { index_key: String }, + #[error("TransactionHistoryNotAvailable")] TransactionHistoryNotAvailable, + #[error("ScanError")] + ScanError { message: String }, } #[derive(Debug, Serialize, Deserialize)] @@ -141,6 +146,11 @@ impl From for Error { message: "Transaction history is not available from this node".to_string(), data: None, }, + RpcCustomError::ScanError { message } => Self { + code: ErrorCode::ServerError(JSON_RPC_SCAN_ERROR), + message, + data: None, + }, } } } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index ceb0b2c65f..bbfaf6d9f4 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -17,7 +17,12 @@ use solana_ledger::{ }; use solana_measure::measure::Measure; use solana_runtime::{bank::Bank, bank_forks::BankForks, contains::Contains}; -use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; +use solana_sdk::{ + clock::{BankId, Slot}, + epoch_schedule::EpochSchedule, + pubkey::Pubkey, + timing::timestamp, +}; use std::{ collections::{HashMap, HashSet}, iter::Iterator, @@ -559,7 +564,7 @@ impl RepairService { #[allow(dead_code)] fn process_new_duplicate_slots( - new_duplicate_slots: &[Slot], + new_duplicate_slots: &[(Slot, BankId)], duplicate_slot_repair_statuses: &mut HashMap, cluster_slots: &ClusterSlots, root_bank: &Bank, @@ -568,7 +573,7 @@ impl RepairService { duplicate_slots_reset_sender: &DuplicateSlotsResetSender, repair_validators: &Option>, ) { - for slot in new_duplicate_slots { + for (slot, bank_id) in new_duplicate_slots { warn!( "Cluster confirmed slot: {}, dumping our current version and repairing", slot @@ -577,7 +582,7 @@ impl RepairService { root_bank.clear_slot_signatures(*slot); // Clear the accounts for this slot - root_bank.remove_unrooted_slots(&[*slot]); + root_bank.remove_unrooted_slots(&[(*slot, *bank_id)]); // Clear the slot-related data in blockstore. This will: // 1) Clear old shreds allowing new ones to be inserted @@ -1139,6 +1144,7 @@ mod test { ); let bank0 = Arc::new(Bank::new(&genesis_config)); let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot); + let duplicate_bank_id = bank9.bank_id(); let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey()); bank9 .transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey()) @@ -1156,7 +1162,7 @@ mod test { assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some()); RepairService::process_new_duplicate_slots( - &[duplicate_slot], + &[(duplicate_slot, duplicate_bank_id)], &mut duplicate_slot_repair_statuses, &cluster_slots, &bank9, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b582e5fef8..cb8833cc17 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -850,12 +850,6 @@ impl ReplayStage { // Clear the duplicate banks from BankForks { let mut w_bank_forks = bank_forks.write().unwrap(); - // Purging should have already been taken care of by logic - // in repair_service, so make sure drop implementation doesn't - // run - if let Some(b) = w_bank_forks.get(*d) { - b.skip_drop.store(true, Ordering::Relaxed) - } w_bank_forks.remove(*d); } } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 4a3aaf047b..7b27026a6f 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -2041,6 +2041,7 @@ fn main() { if remove_stake_accounts { for (address, mut account) in bank .get_program_accounts(&solana_stake_program::id()) + .unwrap() .into_iter() { account.set_lamports(0); @@ -2074,6 +2075,7 @@ fn main() { // Delete existing vote accounts for (address, mut account) in bank .get_program_accounts(&solana_vote_program::id()) + .unwrap() .into_iter() { account.set_lamports(0); @@ -2235,6 +2237,7 @@ fn main() { let accounts: BTreeMap<_, _> = bank .get_all_accounts_with_modified_slots() + .unwrap() .into_iter() .filter(|(pubkey, _account, _slot)| { include_sysvars || !solana_sdk::sysvar::is_sysvar_id(pubkey) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index ed9822944f..6aea3fe50c 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -92,6 +92,8 @@ use { tokio::runtime::Runtime, }; +type RpcCustomResult = std::result::Result; + pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720; @@ -705,18 +707,23 @@ impl JsonRpcRequestProcessor { fn get_largest_accounts( &self, config: Option, - ) -> RpcResponse> { + ) -> RpcCustomResult>> { let config = config.unwrap_or_default(); let bank = self.bank(config.commitment); if let Some((slot, accounts)) = self.get_cached_largest_accounts(&config.filter) { - Response { + Ok(Response { context: RpcResponseContext { slot }, value: accounts, - } + }) } else { let (addresses, address_filter) = if let Some(filter) = config.clone().filter { - let non_circulating_supply = calculate_non_circulating_supply(&bank); + let non_circulating_supply = + calculate_non_circulating_supply(&bank).map_err(|e| { + RpcCustomError::ScanError { + message: e.to_string(), + } + })?; let addresses = non_circulating_supply.accounts.into_iter().collect(); let address_filter = match filter { RpcLargestAccountsFilter::Circulating => AccountAddressFilter::Exclude, @@ -728,6 +735,9 @@ impl JsonRpcRequestProcessor { }; let accounts = bank .get_largest_accounts(NUM_LARGEST_ACCOUNTS, &addresses, address_filter) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })? .into_iter() .map(|(address, lamports)| RpcAccountBalance { address: address.to_string(), @@ -736,15 +746,21 @@ impl JsonRpcRequestProcessor { .collect::>(); self.set_cached_largest_accounts(&config.filter, bank.slot(), &accounts); - new_response(&bank, accounts) + Ok(new_response(&bank, accounts)) } } - fn get_supply(&self, commitment: Option) -> RpcResponse { + fn get_supply( + &self, + commitment: Option, + ) -> RpcCustomResult> { let bank = self.bank(commitment); - let non_circulating_supply = calculate_non_circulating_supply(&bank); + let non_circulating_supply = + calculate_non_circulating_supply(&bank).map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })?; let total_supply = bank.capitalization(); - new_response( + Ok(new_response( &bank, RpcSupply { total: total_supply, @@ -756,7 +772,7 @@ impl JsonRpcRequestProcessor { .map(|pubkey| pubkey.to_string()) .collect(), }, - ) + )) } fn get_vote_accounts( @@ -1738,7 +1754,7 @@ impl JsonRpcRequestProcessor { bank: &Arc, program_id: &Pubkey, filters: Vec, - ) -> Result> { + ) -> RpcCustomResult> { let filter_closure = |account: &AccountSharedData| { filters.iter().all(|filter_type| match filter_type { RpcFilterType::DataSize(size) => account.data().len() as u64 == *size, @@ -1753,21 +1769,26 @@ impl JsonRpcRequestProcessor { if !self.config.account_indexes.include_key(program_id) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: program_id.to_string(), - } - .into()); + }); } - Ok( - bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(*program_id), |account| { + Ok(bank + .get_filtered_indexed_accounts(&IndexKey::ProgramId(*program_id), |account| { // The program-id account index checks for Account owner on inclusion. However, due // to the current AccountsDb implementation, an account may remain in storage as a // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later // updates. We include the redundant filters here to avoid returning these // accounts. account.owner() == program_id && filter_closure(account) - }), - ) + }) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })?) } else { - Ok(bank.get_filtered_program_accounts(program_id, filter_closure)) + Ok(bank + .get_filtered_program_accounts(program_id, filter_closure) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })?) } } @@ -1777,7 +1798,7 @@ impl JsonRpcRequestProcessor { bank: &Arc, owner_key: &Pubkey, mut filters: Vec, - ) -> Result> { + ) -> RpcCustomResult> { // The by-owner accounts index checks for Token Account state and Owner address on // inclusion. However, due to the current AccountsDb implementation, an account may remain // in storage as a zero-lamport AccountSharedData::Default() after being wiped and reinitialized in @@ -1802,19 +1823,19 @@ impl JsonRpcRequestProcessor { if !self.config.account_indexes.include_key(owner_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: owner_key.to_string(), - } - .into()); + }); } - Ok(bank.get_filtered_indexed_accounts( - &IndexKey::SplTokenOwner(*owner_key), - |account| { + Ok(bank + .get_filtered_indexed_accounts(&IndexKey::SplTokenOwner(*owner_key), |account| { account.owner() == &spl_token_id_v2_0() && filters.iter().all(|filter_type| match filter_type { RpcFilterType::DataSize(size) => account.data().len() as u64 == *size, RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data()), }) - }, - )) + }) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })?) } else { self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters) } @@ -1826,7 +1847,7 @@ impl JsonRpcRequestProcessor { bank: &Arc, mint_key: &Pubkey, mut filters: Vec, - ) -> Result> { + ) -> RpcCustomResult> { // The by-mint accounts index checks for Token Account state and Mint address on inclusion. // However, due to the current AccountsDb implementation, an account may remain in storage // as be zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later @@ -1850,18 +1871,19 @@ impl JsonRpcRequestProcessor { if !self.config.account_indexes.include_key(mint_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: mint_key.to_string(), - } - .into()); + }); } - Ok( - bank.get_filtered_indexed_accounts(&IndexKey::SplTokenMint(*mint_key), |account| { + Ok(bank + .get_filtered_indexed_accounts(&IndexKey::SplTokenMint(*mint_key), |account| { account.owner() == &spl_token_id_v2_0() && filters.iter().all(|filter_type| match filter_type { RpcFilterType::DataSize(size) => account.data().len() as u64 == *size, RpcFilterType::Memcmp(compare) => compare.bytes_match(&account.data()), }) - }), - ) + }) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })?) } else { self.get_filtered_program_accounts(bank, &spl_token_id_v2_0(), filters) } @@ -2872,7 +2894,7 @@ pub mod rpc_full { config: Option, ) -> Result>> { debug!("get_largest_accounts rpc request received"); - Ok(meta.get_largest_accounts(config)) + Ok(meta.get_largest_accounts(config)?) } fn get_supply( @@ -2881,7 +2903,7 @@ pub mod rpc_full { commitment: Option, ) -> Result> { debug!("get_supply rpc request received"); - Ok(meta.get_supply(commitment)) + Ok(meta.get_supply(commitment)?) } fn request_airdrop( diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 0d8adf819f..d911043214 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -246,6 +246,7 @@ fn process_rest(bank_forks: &Arc>, path: &str) -> Option, filter: AccountAddressFilter, - ) -> Vec<(Pubkey, u64)> { + ) -> ScanResult> { if num == 0 { - return vec![]; + return Ok(vec![]); } let account_balances = self.accounts_db.scan_accounts( ancestors, + bank_id, |collector: &mut BinaryHeap>, option| { if let Some((pubkey, account, _slot)) = option { if account.lamports() == 0 { @@ -619,12 +621,12 @@ impl Accounts { collector.push(Reverse((account.lamports(), *pubkey))); } }, - ); - account_balances + )?; + Ok(account_balances .into_sorted_vec() .into_iter() .map(|Reverse((balance, pubkey))| (pubkey, balance)) - .collect() + .collect()) } pub fn calculate_capitalization( @@ -687,10 +689,12 @@ impl Accounts { pub fn load_by_program( &self, ancestors: &Ancestors, + bank_id: BankId, program_id: &Pubkey, - ) -> Vec<(Pubkey, AccountSharedData)> { + ) -> ScanResult> { self.accounts_db.scan_accounts( ancestors, + bank_id, |collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| { Self::load_while_filtering(collector, some_account_tuple, |account| { account.owner() == program_id @@ -702,11 +706,13 @@ impl Accounts { pub fn load_by_program_with_filter bool>( &self, ancestors: &Ancestors, + bank_id: BankId, program_id: &Pubkey, filter: F, - ) -> Vec<(Pubkey, AccountSharedData)> { + ) -> ScanResult> { self.accounts_db.scan_accounts( ancestors, + bank_id, |collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| { Self::load_while_filtering(collector, some_account_tuple, |account| { account.owner() == program_id && filter(account) @@ -718,12 +724,14 @@ impl Accounts { pub fn load_by_index_key_with_filter bool>( &self, ancestors: &Ancestors, + bank_id: BankId, index_key: &IndexKey, filter: F, - ) -> Vec<(Pubkey, AccountSharedData)> { + ) -> ScanResult> { self.accounts_db .index_scan_accounts( ancestors, + bank_id, *index_key, |collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| { Self::load_while_filtering(collector, some_account_tuple, |account| { @@ -731,16 +739,21 @@ impl Accounts { }) }, ) - .0 + .map(|result| result.0) } pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool { self.accounts_db.account_indexes.include_key(key) } - pub fn load_all(&self, ancestors: &Ancestors) -> Vec<(Pubkey, AccountSharedData, Slot)> { + pub fn load_all( + &self, + ancestors: &Ancestors, + bank_id: BankId, + ) -> ScanResult> { self.accounts_db.scan_accounts( ancestors, + bank_id, |collector: &mut Vec<(Pubkey, AccountSharedData, Slot)>, some_account_tuple| { if let Some((pubkey, account, slot)) = some_account_tuple .filter(|(_, account, _)| Self::is_loadable(account.lamports())) @@ -931,8 +944,8 @@ impl Accounts { /// Purge a slot if it is not a root /// Root slots cannot be purged /// `is_from_abs` is true if the caller is the AccountsBackgroundService - pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) { - self.accounts_db.purge_slot(slot, is_from_abs); + pub fn purge_slot(&self, slot: Slot, bank_id: BankId, is_from_abs: bool) { + self.accounts_db.purge_slot(slot, bank_id, is_from_abs); } /// Add a slot to root. Root slots cannot be purged @@ -2580,108 +2593,160 @@ mod tests { let all_pubkeys: HashSet<_> = vec![pubkey0, pubkey1, pubkey2].into_iter().collect(); // num == 0 should always return empty set + let bank_id = 0; assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 0, - &HashSet::new(), - AccountAddressFilter::Exclude - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 0, + &HashSet::new(), + AccountAddressFilter::Exclude + ) + .unwrap(), vec![] ); assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 0, - &all_pubkeys, - AccountAddressFilter::Include - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 0, + &all_pubkeys, + AccountAddressFilter::Include + ) + .unwrap(), vec![] ); // list should be sorted by balance, then pubkey, descending assert!(pubkey1 > pubkey0); assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 1, - &HashSet::new(), - AccountAddressFilter::Exclude - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 1, + &HashSet::new(), + AccountAddressFilter::Exclude + ) + .unwrap(), vec![(pubkey1, 42)] ); assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 2, - &HashSet::new(), - AccountAddressFilter::Exclude - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 2, + &HashSet::new(), + AccountAddressFilter::Exclude + ) + .unwrap(), vec![(pubkey1, 42), (pubkey0, 42)] ); assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 3, - &HashSet::new(), - AccountAddressFilter::Exclude - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 3, + &HashSet::new(), + AccountAddressFilter::Exclude + ) + .unwrap(), vec![(pubkey1, 42), (pubkey0, 42), (pubkey2, 41)] ); // larger num should not affect results assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 6, - &HashSet::new(), - AccountAddressFilter::Exclude - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 6, + &HashSet::new(), + AccountAddressFilter::Exclude + ) + .unwrap(), vec![(pubkey1, 42), (pubkey0, 42), (pubkey2, 41)] ); // AccountAddressFilter::Exclude should exclude entry let exclude1: HashSet<_> = vec![pubkey1].into_iter().collect(); assert_eq!( - accounts.load_largest_accounts(&ancestors, 1, &exclude1, AccountAddressFilter::Exclude), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 1, + &exclude1, + AccountAddressFilter::Exclude + ) + .unwrap(), vec![(pubkey0, 42)] ); assert_eq!( - accounts.load_largest_accounts(&ancestors, 2, &exclude1, AccountAddressFilter::Exclude), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 2, + &exclude1, + AccountAddressFilter::Exclude + ) + .unwrap(), vec![(pubkey0, 42), (pubkey2, 41)] ); assert_eq!( - accounts.load_largest_accounts(&ancestors, 3, &exclude1, AccountAddressFilter::Exclude), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 3, + &exclude1, + AccountAddressFilter::Exclude + ) + .unwrap(), vec![(pubkey0, 42), (pubkey2, 41)] ); // AccountAddressFilter::Include should limit entries let include1_2: HashSet<_> = vec![pubkey1, pubkey2].into_iter().collect(); assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 1, - &include1_2, - AccountAddressFilter::Include - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 1, + &include1_2, + AccountAddressFilter::Include + ) + .unwrap(), vec![(pubkey1, 42)] ); assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 2, - &include1_2, - AccountAddressFilter::Include - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 2, + &include1_2, + AccountAddressFilter::Include + ) + .unwrap(), vec![(pubkey1, 42), (pubkey2, 41)] ); assert_eq!( - accounts.load_largest_accounts( - &ancestors, - 3, - &include1_2, - AccountAddressFilter::Include - ), + accounts + .load_largest_accounts( + &ancestors, + bank_id, + 3, + &include1_2, + AccountAddressFilter::Include + ) + .unwrap(), vec![(pubkey1, 42), (pubkey2, 41)] ); } diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 0caa43d834..a321d35f5a 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -12,7 +12,10 @@ use crossbeam_channel::{Receiver, SendError, Sender}; use log::*; use rand::{thread_rng, Rng}; use solana_measure::measure::Measure; -use solana_sdk::{clock::Slot, hash::Hash}; +use solana_sdk::{ + clock::{BankId, Slot}, + hash::Hash, +}; use std::{ boxed::Box, fmt::{Debug, Formatter}, @@ -39,8 +42,8 @@ const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATI pub type SnapshotRequestSender = Sender; pub type SnapshotRequestReceiver = Receiver; -pub type DroppedSlotsSender = Sender; -pub type DroppedSlotsReceiver = Receiver; +pub type DroppedSlotsSender = Sender<(Slot, BankId)>; +pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>; #[derive(Clone)] pub struct SendDroppedBankCallback { @@ -49,7 +52,7 @@ pub struct SendDroppedBankCallback { impl DropCallback for SendDroppedBankCallback { fn callback(&self, bank: &Bank) { - if let Err(e) = self.sender.send(bank.slot()) { + if let Err(e) = self.sender.send((bank.slot(), bank.bank_id())) { warn!("Error sending dropped banks: {:?}", e); } } @@ -273,9 +276,11 @@ impl AbsRequestHandler { /// `is_from_abs` is true if the caller is the AccountsBackgroundService pub fn handle_pruned_banks(&self, bank: &Bank, is_from_abs: bool) -> usize { let mut count = 0; - for pruned_slot in self.pruned_banks_receiver.try_iter() { + for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() { count += 1; - bank.rc.accounts.purge_slot(pruned_slot, is_from_abs); + bank.rc + .accounts + .purge_slot(pruned_slot, pruned_bank_id, is_from_abs); } count @@ -465,7 +470,7 @@ mod test { &AccountSharedData::new(264, 0, &Pubkey::default()), ); assert!(bank0.get_account(&account_key).is_some()); - pruned_banks_sender.send(0).unwrap(); + pruned_banks_sender.send((0, 0)).unwrap(); assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty()); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 916502929f..de01135a24 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -24,7 +24,7 @@ use crate::{ accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats, PreviousPass}, accounts_index::{ AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexRootsStats, - IndexKey, IsCached, SlotList, SlotSlice, ZeroLamport, + IndexKey, IsCached, ScanResult, SlotList, SlotSlice, ZeroLamport, }, ancestors::Ancestors, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, @@ -48,7 +48,7 @@ use solana_measure::measure::Measure; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ account::{AccountSharedData, ReadableAccount}, - clock::{Epoch, Slot}, + clock::{BankId, Epoch, Slot}, genesis_config::ClusterType, hash::{Hash, Hasher}, pubkey::Pubkey, @@ -1506,7 +1506,7 @@ impl AccountsDb { }; if no_delete { let mut pending_store_ids: HashSet = HashSet::new(); - for (_slot_id, account_info) in account_infos { + for (_bank_id, account_info) in account_infos { if !already_counted.contains(&account_info.store_id) { pending_store_ids.insert(account_info.store_id); } @@ -2420,21 +2420,29 @@ impl AccountsDb { } } - pub fn scan_accounts(&self, ancestors: &Ancestors, scan_func: F) -> A + pub fn scan_accounts( + &self, + ancestors: &Ancestors, + bank_id: BankId, + scan_func: F, + ) -> ScanResult where F: Fn(&mut A, Option<(&Pubkey, AccountSharedData, Slot)>), A: Default, { let mut collector = A::default(); + + // This can error out if the slots being scanned over are aborted self.accounts_index - .scan_accounts(ancestors, |pubkey, (account_info, slot)| { + .scan_accounts(ancestors, bank_id, |pubkey, (account_info, slot)| { let account_slot = self .get_account_accessor(slot, pubkey, account_info.store_id, account_info.offset) .get_loaded_account() .map(|loaded_account| (pubkey, loaded_account.take_account(), slot)); scan_func(&mut collector, account_slot) - }); - collector + })?; + + Ok(collector) } pub fn unchecked_scan_accounts( @@ -2504,9 +2512,10 @@ impl AccountsDb { pub fn index_scan_accounts( &self, ancestors: &Ancestors, + bank_id: BankId, index_key: IndexKey, scan_func: F, - ) -> (A, bool) + ) -> ScanResult<(A, bool)> where F: Fn(&mut A, Option<(&Pubkey, AccountSharedData, Slot)>), A: Default, @@ -2519,12 +2528,14 @@ impl AccountsDb { if !self.account_indexes.include_key(key) { // the requested key was not indexed in the secondary index, so do a normal scan let used_index = false; - return (self.scan_accounts(ancestors, scan_func), used_index); + let scan_result = self.scan_accounts(ancestors, bank_id, scan_func)?; + return Ok((scan_result, used_index)); } let mut collector = A::default(); self.accounts_index.index_scan_accounts( ancestors, + bank_id, index_key, |pubkey, (account_info, slot)| { let account_slot = self @@ -2533,9 +2544,9 @@ impl AccountsDb { .map(|loaded_account| (pubkey, loaded_account.take_account(), slot)); scan_func(&mut collector, account_slot) }, - ); + )?; let used_index = true; - (collector, used_index) + Ok((collector, used_index)) } /// Scan a specific slot through all the account storage in parallel @@ -3284,14 +3295,29 @@ impl AccountsDb { SendDroppedBankCallback::new(pruned_banks_sender) } + /// This should only be called after the `Bank::drop()` runs in bank.rs, See BANK_DROP_SAFETY + /// comment below for more explanation. /// `is_from_abs` is true if the caller is the AccountsBackgroundService - pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) { + pub fn purge_slot(&self, slot: Slot, bank_id: BankId, is_from_abs: bool) { if self.is_bank_drop_callback_enabled.load(Ordering::SeqCst) && !is_from_abs { panic!("bad drop callpath detected; Bank::drop() must run serially with other logic in ABS like clean_accounts()") } - let mut slots = HashSet::new(); - slots.insert(slot); - self.purge_slots(&slots); + // BANK_DROP_SAFETY: Because this function only runs once the bank is dropped, + // we know that there are no longer any ongoing scans on this bank, because scans require + // and hold a reference to the bank at the tip of the fork they're scanning. Hence it's + // safe to remove this bank_id from the `removed_bank_ids` list at this point. + if self + .accounts_index + .removed_bank_ids + .lock() + .unwrap() + .remove(&bank_id) + { + // If this slot was already cleaned up, no need to do any further cleans + return; + } + + self.purge_slots(std::iter::once(&slot)); } fn recycle_slot_stores( @@ -3325,7 +3351,7 @@ impl AccountsDb { /// Purges every slot in `removed_slots` from both the cache and storage. This includes /// entries in the accounts index, cache entries, and any backing storage entries. fn purge_slots_from_cache_and_store<'a>( - &'a self, + &self, removed_slots: impl Iterator, purge_stats: &PurgeStats, ) { @@ -3336,7 +3362,11 @@ impl AccountsDb { // This function is only currently safe with respect to `flush_slot_cache()` because // both functions run serially in AccountsBackgroundService. let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed"); - if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) { + // Note: we cannot remove this slot from the slot cache until we've removed its + // entries from the accounts index first. This is because `scan_accounts()` relies on + // holding the index lock, finding the index entry, and then looking up the entry + // in the cache. If it fails to find that entry, it will panic in `get_loaded_account()` + if let Some(slot_cache) = self.accounts_cache.slot_cache(*remove_slot) { // If the slot is still in the cache, remove the backing storages for // the slot and from the Accounts Index num_cached_slots_removed += 1; @@ -3344,6 +3374,8 @@ impl AccountsDb { self.purge_slot_cache(*remove_slot, slot_cache); remove_cache_elapsed.stop(); remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us(); + // Nobody else shoud have removed the slot cache entry yet + assert!(self.accounts_cache.remove_slot(*remove_slot).is_some()); } else { self.purge_slot_storage(*remove_slot, purge_stats); } @@ -3531,39 +3563,33 @@ impl AccountsDb { } #[allow(clippy::needless_collect)] - fn purge_slots(&self, slots: &HashSet) { + fn purge_slots<'a>(&self, slots: impl Iterator) { // `add_root()` should be called first let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); - let non_roots: Vec<&Slot> = slots - .iter() - .filter(|slot| !self.accounts_index.is_root(**slot)) - .collect(); + let non_roots = slots + // Only safe to check when there are duplciate versions of a slot + // because ReplayStage will not make new roots before dumping the + // duplicate slots first. Thus we will not be in a case where we + // root slot `S`, then try to dump some other version of slot `S`, the + // dumping has to finish first + // + // Also note roots are never removed via `remove_unrooted_slot()`, so + // it's safe to filter them out here as they won't need deletion from + // self.accounts_index.removed_bank_ids in `purge_slots_from_cache_and_store()`. + .filter(|slot| !self.accounts_index.is_root(**slot)); safety_checks_elapsed.stop(); self.external_purge_slots_stats .safety_checks_elapsed .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); - self.purge_slots_from_cache_and_store( - non_roots.into_iter(), - &self.external_purge_slots_stats, - ); + self.purge_slots_from_cache_and_store(non_roots, &self.external_purge_slots_stats); self.external_purge_slots_stats .report("external_purge_slots_stats", Some(1000)); } - // TODO: This is currently unsafe with scan because it can remove a slot in the middle - /// Remove the set of slots in `remove_slots` from both the cache and storage. This requires - /// we know the contents of the slot are either: - /// - /// 1) Completely in the cache - /// 2) Have been completely flushed from the cache - /// - /// in order to guarantee that when this function returns, the contents of the slot have - /// been completely and not only partially removed. Thus synchronization with `flush_slot_cache()` - /// through `self.remove_unrooted_slots_synchronization` is necessary. - pub fn remove_unrooted_slots(&self, remove_slots: &[Slot]) { + pub fn remove_unrooted_slots(&self, remove_slots: &[(Slot, BankId)]) { let rooted_slots = self .accounts_index - .get_rooted_from_list(remove_slots.iter()); + .get_rooted_from_list(remove_slots.iter().map(|(slot, _)| slot)); assert!( rooted_slots.is_empty(), "Trying to remove accounts for rooted slots {:?}", @@ -3583,7 +3609,7 @@ impl AccountsDb { // we want to remove in this function let mut remaining_contended_flush_slots: Vec = remove_slots .iter() - .filter(|remove_slot| { + .filter_map(|(remove_slot, _)| { let is_being_flushed = currently_contended_slots.contains(remove_slot); if !is_being_flushed { // Reserve the slots that we want to purge that aren't currently @@ -3594,10 +3620,10 @@ impl AccountsDb { // before another version of the same slot can be replayed. This means // multiple threads should not call `remove_unrooted_slots()` simultaneously // with the same slot. - currently_contended_slots.insert(**remove_slot); + currently_contended_slots.insert(*remove_slot); } // If the cache is currently flushing this slot, add it to the list - is_being_flushed + Some(remove_slot).filter(|_| is_being_flushed) }) .cloned() .collect(); @@ -3630,13 +3656,26 @@ impl AccountsDb { } } + // Mark down these slots are about to be purged so that new attempts to scan these + // banks fail, and any ongoing scans over these slots will detect that they should abort + // their results + { + let mut locked_removed_bank_ids = self.accounts_index.removed_bank_ids.lock().unwrap(); + for (_slot, remove_bank_id) in remove_slots.iter() { + locked_removed_bank_ids.insert(*remove_bank_id); + } + } + let remove_unrooted_purge_stats = PurgeStats::default(); - self.purge_slots_from_cache_and_store(remove_slots.iter(), &remove_unrooted_purge_stats); + self.purge_slots_from_cache_and_store( + remove_slots.iter().map(|(slot, _)| slot), + &remove_unrooted_purge_stats, + ); remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0)); let mut currently_contended_slots = slots_under_contention.lock().unwrap(); - for slot in remove_slots { - assert!(currently_contended_slots.remove(slot)); + for (remove_slot, _) in remove_slots { + assert!(currently_contended_slots.remove(remove_slot)); } } @@ -6945,6 +6984,7 @@ pub mod tests { fn run_test_remove_unrooted_slot(is_cached: bool) { let unrooted_slot = 9; + let unrooted_bank_id = 9; let mut db = AccountsDb::new(Vec::new(), &ClusterType::Development); db.caching_enabled = true; let key = Pubkey::default(); @@ -6966,7 +7006,7 @@ pub mod tests { assert_load_account(&db, unrooted_slot, key, 1); // Purge the slot - db.remove_unrooted_slots(&[unrooted_slot]); + db.remove_unrooted_slots(&[(unrooted_slot, unrooted_bank_id)]); assert!(db.load_without_fixed_root(&ancestors, &key).is_none()); assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none()); assert!(db.accounts_cache.slot_cache(unrooted_slot).is_none()); @@ -6997,13 +7037,14 @@ pub mod tests { fn test_remove_unrooted_slot_snapshot() { solana_logger::setup(); let unrooted_slot = 9; + let unrooted_bank_id = 9; let db = AccountsDb::new(Vec::new(), &ClusterType::Development); let key = solana_sdk::pubkey::new_rand(); let account0 = AccountSharedData::new(1, 0, &key); db.store_uncached(unrooted_slot, &[(&key, &account0)]); // Purge the slot - db.remove_unrooted_slots(&[unrooted_slot]); + db.remove_unrooted_slots(&[(unrooted_slot, unrooted_bank_id)]); // Add a new root let key2 = solana_sdk::pubkey::new_rand(); @@ -7628,11 +7669,13 @@ pub mod tests { // Secondary index should still find both pubkeys let mut found_accounts = HashSet::new(); let index_key = IndexKey::SplTokenMint(mint_key); + let bank_id = 0; accounts .accounts_index - .index_scan_accounts(&Ancestors::default(), index_key, |key, _| { + .index_scan_accounts(&Ancestors::default(), bank_id, index_key, |key, _| { found_accounts.insert(*key); - }); + }) + .unwrap(); assert_eq!(found_accounts.len(), 2); assert!(found_accounts.contains(&pubkey1)); assert!(found_accounts.contains(&pubkey2)); @@ -7643,13 +7686,16 @@ pub mod tests { keys: [mint_key].iter().cloned().collect::>(), }); // Secondary index can't be used - do normal scan: should still find both pubkeys - let found_accounts = accounts.index_scan_accounts( - &Ancestors::default(), - index_key, - |collection: &mut HashSet, account| { - collection.insert(*account.unwrap().0); - }, - ); + let found_accounts = accounts + .index_scan_accounts( + &Ancestors::default(), + bank_id, + index_key, + |collection: &mut HashSet, account| { + collection.insert(*account.unwrap().0); + }, + ) + .unwrap(); assert!(!found_accounts.1); assert_eq!(found_accounts.0.len(), 2); assert!(found_accounts.0.contains(&pubkey1)); @@ -7658,13 +7704,16 @@ pub mod tests { accounts.account_indexes.keys = None; // Secondary index can now be used since it isn't marked as excluded - let found_accounts = accounts.index_scan_accounts( - &Ancestors::default(), - index_key, - |collection: &mut HashSet, account| { - collection.insert(*account.unwrap().0); - }, - ); + let found_accounts = accounts + .index_scan_accounts( + &Ancestors::default(), + bank_id, + index_key, + |collection: &mut HashSet, account| { + collection.insert(*account.unwrap().0); + }, + ) + .unwrap(); assert!(found_accounts.1); assert_eq!(found_accounts.0.len(), 2); assert!(found_accounts.0.contains(&pubkey1)); @@ -7689,11 +7738,15 @@ pub mod tests { // Secondary index should have purged `pubkey1` as well let mut found_accounts = vec![]; - accounts.accounts_index.index_scan_accounts( - &Ancestors::default(), - IndexKey::SplTokenMint(mint_key), - |key, _| found_accounts.push(*key), - ); + accounts + .accounts_index + .index_scan_accounts( + &Ancestors::default(), + bank_id, + IndexKey::SplTokenMint(mint_key), + |key, _| found_accounts.push(*key), + ) + .unwrap(); assert_eq!(found_accounts, vec![pubkey2]); } @@ -10167,6 +10220,7 @@ pub mod tests { fn setup_scan( db: Arc, scan_ancestors: Arc, + bank_id: BankId, stall_key: Pubkey, ) -> ScanTracker { let exit = Arc::new(AtomicBool::new(false)); @@ -10179,6 +10233,7 @@ pub mod tests { .spawn(move || { db.scan_accounts( &scan_ancestors, + bank_id, |_collector: &mut Vec<(Pubkey, AccountSharedData)>, maybe_account| { ready_.store(true, Ordering::Relaxed); if let Some((pubkey, _, _)) = maybe_account { @@ -10193,7 +10248,8 @@ pub mod tests { } } }, - ); + ) + .unwrap(); }) .unwrap(); @@ -10239,7 +10295,8 @@ pub mod tests { let max_scan_root = 0; db.add_root(max_scan_root); let scan_ancestors: Arc = Arc::new(vec![(0, 1), (1, 1)].into_iter().collect()); - let scan_tracker = setup_scan(db.clone(), scan_ancestors.clone(), account_key2); + let bank_id = 0; + let scan_tracker = setup_scan(db.clone(), scan_ancestors.clone(), bank_id, account_key2); // Add a new root 2 let new_root = 2; @@ -10299,7 +10356,8 @@ pub mod tests { assert_eq!(account.0.lamports(), slot1_account.lamports()); // Simulate dropping the bank, which finally removes the slot from the cache - db.purge_slot(1, false); + let bank_id = 1; + db.purge_slot(1, bank_id, false); assert!(db .do_load( &scan_ancestors, @@ -10404,7 +10462,13 @@ pub mod tests { accounts_db.add_root(*slot as Slot); if Some(*slot) == scan_slot { let ancestors = Arc::new(vec![(stall_slot, 1), (*slot, 1)].into_iter().collect()); - scan_tracker = Some(setup_scan(accounts_db.clone(), ancestors, scan_stall_key)); + let bank_id = 0; + scan_tracker = Some(setup_scan( + accounts_db.clone(), + ancestors, + bank_id, + scan_stall_key, + )); assert_eq!( accounts_db.accounts_index.min_ongoing_scan_root().unwrap(), *slot @@ -11237,7 +11301,12 @@ pub mod tests { account.set_lamports(lamports); // Pick random 50% of the slots to pass to `remove_unrooted_slots()` - let mut all_slots: Vec = (0..num_cached_slots).collect(); + let mut all_slots: Vec<(Slot, BankId)> = (0..num_cached_slots) + .map(|slot| { + let bank_id = slot + 1; + (slot, bank_id) + }) + .collect(); all_slots.shuffle(&mut rand::thread_rng()); let slots_to_dump = &all_slots[0..num_cached_slots as usize / 2]; let slots_to_keep = &all_slots[num_cached_slots as usize / 2..]; @@ -11270,7 +11339,7 @@ pub mod tests { // Check that all the slots in `slots_to_dump` were completely removed from the // cache, storage, and index - for slot in slots_to_dump { + for (slot, _) in slots_to_dump { assert!(db.storage.get_slot_storage_entries(*slot).is_none()); assert!(db.accounts_cache.slot_cache(*slot).is_none()); let account_in_slot = slot_to_pubkey_map[slot]; @@ -11283,7 +11352,7 @@ pub mod tests { // Wait for flush to finish before starting next trial flush_done_receiver.recv().unwrap(); - for slot in slots_to_keep { + for (slot, bank_id) in slots_to_keep { let account_in_slot = slot_to_pubkey_map[slot]; assert!(db .load( @@ -11294,7 +11363,7 @@ pub mod tests { .is_some()); // Clear for next iteration so that `assert!(self.storage.get_slot_stores(purged_slot).is_none());` // in `purge_slot_pubkeys()` doesn't trigger - db.remove_unrooted_slots(&[*slot]); + db.remove_unrooted_slots(&[(*slot, *bank_id)]); } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 37a2f670e8..b6e402e2ae 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -10,7 +10,7 @@ use log::*; use ouroboros::self_referencing; use solana_measure::measure::Measure; use solana_sdk::{ - clock::Slot, + clock::{BankId, Slot}, pubkey::{Pubkey, PUBKEY_BYTES}, }; use std::{ @@ -25,15 +25,16 @@ use std::{ }, sync::{ atomic::{AtomicU64, Ordering}, - Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, + Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, }, }; +use thiserror::Error; pub const ITER_BATCH_SIZE: usize = 1000; +pub type ScanResult = Result; pub type SlotList = Vec<(Slot, T)>; pub type SlotSlice<'s, T> = &'s [(Slot, T)]; - pub type RefCount = u64; pub type AccountMap = BTreeMap; @@ -55,6 +56,12 @@ impl IsCached for u64 { } } +#[derive(Error, Debug, PartialEq)] +pub enum ScanError { + #[error("Node detected it replayed bad version of slot {slot:?} with id {bank_id:?}, thus the scan on said slot was aborted")] + SlotRemoved { slot: Slot, bank_id: BankId }, +} + enum ScanTypes> { Unindexed(Option), Indexed(IndexKey), @@ -458,6 +465,10 @@ impl RollingBitField { std::mem::swap(&mut n, self); } + pub fn max(&self) -> u64 { + self.max + } + pub fn get_all(&self) -> Vec { let mut all = Vec::with_capacity(self.count); self.excess.iter().for_each(|slot| all.push(*slot)); @@ -584,6 +595,22 @@ type MapType = AccountMap>; type AccountMapsWriteLock<'a, T> = RwLockWriteGuard<'a, AccountMap>>; type AccountMapsReadLock<'a, T> = RwLockReadGuard<'a, AccountMap>>; +#[derive(Debug, Default)] +pub struct ScanSlotTracker { + is_removed: bool, + ref_count: u64, +} + +impl ScanSlotTracker { + pub fn is_removed(&self) -> bool { + self.is_removed + } + + pub fn mark_removed(&mut self) { + self.is_removed = true; + } +} + #[derive(Debug)] pub struct AccountsIndex { pub account_maps: RwLock>, @@ -592,6 +619,17 @@ pub struct AccountsIndex { spl_token_owner_index: SecondaryIndex, roots_tracker: RwLock, ongoing_scan_roots: RwLock>, + // Each scan has some latest slot `S` that is the tip of the fork the scan + // is iterating over. The unique id of that slot `S` is recorded here (note we don't use + // `S` as the id because there can be more than one version of a slot `S`). If a fork + // is abandoned, all of the slots on that fork up to `S` will be removed via + // `AccountsDb::remove_unrooted_slots()`. When the scan finishes, it'll realize that the + // results of the scan may have been corrupted by `remove_unrooted_slots` and abort its results. + // + // `removed_bank_ids` tracks all the slot ids that were removed via `remove_unrooted_slots()` so any attempted scans + // on any of these slots fails. This is safe to purge once the associated Bank is dropped and + // scanning the fork with that Bank at the tip is no longer possible. + pub removed_bank_ids: Mutex>, zero_lamport_pubkeys: DashSet, } @@ -610,6 +648,7 @@ impl Default for AccountsIndex { ), roots_tracker: RwLock::::default(), ongoing_scan_roots: RwLock::>::default(), + removed_bank_ids: Mutex::>::default(), zero_lamport_pubkeys: DashSet::::default(), } } @@ -627,12 +666,24 @@ impl AccountsIndex { &self, metric_name: &'static str, ancestors: &Ancestors, + scan_bank_id: BankId, func: F, scan_type: ScanTypes, - ) where + ) -> Result<(), ScanError> + where F: FnMut(&Pubkey, (&T, Slot)), R: RangeBounds, { + { + let locked_removed_bank_ids = self.removed_bank_ids.lock().unwrap(); + if locked_removed_bank_ids.contains(&scan_bank_id) { + return Err(ScanError::SlotRemoved { + slot: ancestors.max_slot(), + bank_id: scan_bank_id, + }); + } + } + let max_root = { let mut w_ongoing_scan_roots = self // This lock is also grabbed by clean_accounts(), so clean @@ -809,6 +860,23 @@ impl AccountsIndex { ongoing_scan_roots.remove(&max_root); } } + + // If the fork with tip at bank `scan_bank_id` was removed during our scan, then the scan + // may have been corrupted, so abort the results. + let was_scan_corrupted = self + .removed_bank_ids + .lock() + .unwrap() + .contains(&scan_bank_id); + + if was_scan_corrupted { + Err(ScanError::SlotRemoved { + slot: ancestors.max_slot(), + bank_id: scan_bank_id, + }) + } else { + Ok(()) + } } fn do_unchecked_scan_accounts( @@ -1008,7 +1076,12 @@ impl AccountsIndex { } /// call func with every pubkey and index visible from a given set of ancestors - pub(crate) fn scan_accounts(&self, ancestors: &Ancestors, func: F) + pub(crate) fn scan_accounts( + &self, + ancestors: &Ancestors, + scan_bank_id: BankId, + func: F, + ) -> Result<(), ScanError> where F: FnMut(&Pubkey, (&T, Slot)), { @@ -1016,9 +1089,10 @@ impl AccountsIndex { self.do_checked_scan_accounts( "", ancestors, + scan_bank_id, func, ScanTypes::Unindexed(None::>), - ); + ) } pub(crate) fn unchecked_scan_accounts( @@ -1048,7 +1122,13 @@ impl AccountsIndex { } /// call func with every pubkey and index visible from a given set of ancestors - pub(crate) fn index_scan_accounts(&self, ancestors: &Ancestors, index_key: IndexKey, func: F) + pub(crate) fn index_scan_accounts( + &self, + ancestors: &Ancestors, + scan_bank_id: BankId, + index_key: IndexKey, + func: F, + ) -> Result<(), ScanError> where F: FnMut(&Pubkey, (&T, Slot)), { @@ -1056,9 +1136,10 @@ impl AccountsIndex { self.do_checked_scan_accounts( "", ancestors, + scan_bank_id, func, ScanTypes::>::Indexed(index_key), - ); + ) } pub fn get_rooted_entries(&self, slice: SlotSlice, max: Option) -> SlotList { diff --git a/runtime/src/ancestors.rs b/runtime/src/ancestors.rs index 048e07cdb4..0f6bafba85 100644 --- a/runtime/src/ancestors.rs +++ b/runtime/src/ancestors.rs @@ -75,6 +75,10 @@ impl Ancestors { pub fn is_empty(&self) -> bool { self.len() == 0 } + + pub fn max_slot(&self) -> Slot { + self.ancestors.max() - 1 + } } #[cfg(test)] pub mod tests { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index bbeb2399c7..357eba10b8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -39,7 +39,7 @@ use crate::{ TransactionLoadResult, TransactionLoaders, }, accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages}, - accounts_index::{AccountSecondaryIndexes, IndexKey}, + accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult}, ancestors::{Ancestors, AncestorsForSerialization}, blockhash_queue::BlockhashQueue, builtins::{self, ActivationType}, @@ -68,7 +68,7 @@ use solana_sdk::{ AccountSharedData, InheritableAccountFields, ReadableAccount, WritableAccount, }, clock::{ - Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, + BankId, Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, INITIAL_RENT_EPOCH, MAX_PROCESSING_AGE, MAX_RECENT_BLOCKHASHES, MAX_TRANSACTION_FORWARDING_DELAY, SECONDS_PER_DAY, }, @@ -416,6 +416,8 @@ pub struct BankRc { /// Current slot pub(crate) slot: Slot, + + pub(crate) bank_id_generator: Arc, } #[cfg(RUSTC_WITH_SPECIALIZATION)] @@ -430,6 +432,7 @@ impl AbiExample for BankRc { // AbiExample for Accounts is specially implemented to contain a storage example accounts: AbiExample::example(), slot: AbiExample::example(), + bank_id_generator: Arc::new(AtomicU64::new(0)), } } } @@ -440,6 +443,7 @@ impl BankRc { accounts: Arc::new(accounts), parent: RwLock::new(None), slot, + bank_id_generator: Arc::new(AtomicU64::new(0)), } } @@ -909,6 +913,8 @@ pub struct Bank { /// Bank slot (i.e. block) slot: Slot, + bank_id: BankId, + /// Bank epoch epoch: Epoch, @@ -965,8 +971,6 @@ pub struct Bank { /// Protocol-level rewards that were distributed by this bank pub rewards: RwLock>, - pub skip_drop: AtomicBool, - pub cluster_type: Option, pub lazy_rent_collection: AtomicBool, @@ -1131,6 +1135,7 @@ impl Bank { )), parent: RwLock::new(Some(parent.clone())), slot, + bank_id_generator: parent.rc.bank_id_generator.clone(), }; let src = StatusCacheRc { status_cache: parent.src.status_cache.clone(), @@ -1139,10 +1144,12 @@ impl Bank { let fee_rate_governor = FeeRateGovernor::new_derived(&parent.fee_rate_governor, parent.signature_count()); + let bank_id = rc.bank_id_generator.fetch_add(1, Relaxed) + 1; let mut new = Bank { rc, src, slot, + bank_id, epoch, blockhash_queue: RwLock::new(parent.blockhash_queue.read().unwrap().clone()), @@ -1184,7 +1191,6 @@ impl Bank { hard_forks: parent.hard_forks.clone(), last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Relaxed)), rewards: RwLock::new(vec![]), - skip_drop: AtomicBool::new(false), cluster_type: parent.cluster_type, lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)), no_stake_rewrite: AtomicBool::new(parent.no_stake_rewrite.load(Relaxed)), @@ -1322,6 +1328,7 @@ impl Bank { slots_per_year: fields.slots_per_year, unused: genesis_config.unused, slot: fields.slot, + bank_id: 0, epoch: fields.epoch, block_height: fields.block_height, collector_id: fields.collector_id, @@ -1341,7 +1348,6 @@ impl Bank { feature_builtins: new(), last_vote_sync: new(), rewards: new(), - skip_drop: new(), cluster_type: Some(genesis_config.cluster_type), lazy_rent_collection: new(), no_stake_rewrite: new(), @@ -1441,6 +1447,10 @@ impl Bank { self.slot } + pub fn bank_id(&self) -> BankId { + self.bank_id + } + pub fn epoch(&self) -> Epoch { self.epoch } @@ -2658,7 +2668,7 @@ impl Bank { } } - pub fn remove_unrooted_slots(&self, slots: &[Slot]) { + pub fn remove_unrooted_slots(&self, slots: &[(Slot, BankId)]) { self.rc.accounts.accounts_db.remove_unrooted_slots(slots) } @@ -4335,38 +4345,49 @@ impl Bank { .map(|(acc, _slot)| acc) } - pub fn get_program_accounts(&self, program_id: &Pubkey) -> Vec<(Pubkey, AccountSharedData)> { + pub fn get_program_accounts( + &self, + program_id: &Pubkey, + ) -> ScanResult> { self.rc .accounts - .load_by_program(&self.ancestors, program_id) + .load_by_program(&self.ancestors, self.bank_id, program_id) } pub fn get_filtered_program_accounts bool>( &self, program_id: &Pubkey, filter: F, - ) -> Vec<(Pubkey, AccountSharedData)> { - self.rc - .accounts - .load_by_program_with_filter(&self.ancestors, program_id, filter) + ) -> ScanResult> { + self.rc.accounts.load_by_program_with_filter( + &self.ancestors, + self.bank_id, + program_id, + filter, + ) } pub fn get_filtered_indexed_accounts bool>( &self, index_key: &IndexKey, filter: F, - ) -> Vec<(Pubkey, AccountSharedData)> { - self.rc - .accounts - .load_by_index_key_with_filter(&self.ancestors, index_key, filter) + ) -> ScanResult> { + self.rc.accounts.load_by_index_key_with_filter( + &self.ancestors, + self.bank_id, + index_key, + filter, + ) } pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool { self.rc.accounts.account_indexes_include_key(key) } - pub fn get_all_accounts_with_modified_slots(&self) -> Vec<(Pubkey, AccountSharedData, Slot)> { - self.rc.accounts.load_all(&self.ancestors) + pub fn get_all_accounts_with_modified_slots( + &self, + ) -> ScanResult> { + self.rc.accounts.load_all(&self.ancestors, self.bank_id) } pub fn get_program_accounts_modified_since_parent( @@ -4421,10 +4442,14 @@ impl Bank { num: usize, filter_by_address: &HashSet, filter: AccountAddressFilter, - ) -> Vec<(Pubkey, u64)> { - self.rc - .accounts - .load_largest_accounts(&self.ancestors, num, filter_by_address, filter) + ) -> ScanResult> { + self.rc.accounts.load_largest_accounts( + &self.ancestors, + self.bank_id, + num, + filter_by_address, + filter, + ) } pub fn transaction_count(&self) -> u64 { @@ -5255,10 +5280,6 @@ impl Bank { impl Drop for Bank { fn drop(&mut self) { - if self.skip_drop.load(Relaxed) { - return; - } - if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() { drop_callback.callback(self); } else { @@ -5266,7 +5287,9 @@ impl Drop for Bank { // 1. Tests // 2. At startup when replaying blockstore and there's no // AccountsBackgroundService to perform cleanups yet. - self.rc.accounts.purge_slot(self.slot(), false); + self.rc + .accounts + .purge_slot(self.slot(), self.bank_id(), false); } } } @@ -5307,7 +5330,9 @@ pub(crate) mod tests { use crate::{ accounts_background_service::{AbsRequestHandler, SendDroppedBankCallback}, accounts_db::DEFAULT_ACCOUNTS_SHRINK_RATIO, - accounts_index::{AccountIndex, AccountMap, AccountSecondaryIndexes, ITER_BATCH_SIZE}, + accounts_index::{ + AccountIndex, AccountMap, AccountSecondaryIndexes, ScanError, ITER_BATCH_SIZE, + }, ancestors::Ancestors, genesis_utils::{ activate_all_features, bootstrap_validator_stake_lamports, @@ -9145,7 +9170,7 @@ pub(crate) mod tests { let parent = Arc::new(Bank::new(&genesis_config)); parent.restore_old_behavior_for_fragile_tests(); - let genesis_accounts: Vec<_> = parent.get_all_accounts_with_modified_slots(); + let genesis_accounts: Vec<_> = parent.get_all_accounts_with_modified_slots().unwrap(); assert!( genesis_accounts .iter() @@ -9173,11 +9198,11 @@ pub(crate) mod tests { let bank1 = Arc::new(new_from_parent(&bank0)); bank1.squash(); assert_eq!( - bank0.get_program_accounts(&program_id), + bank0.get_program_accounts(&program_id).unwrap(), vec![(pubkey0, account0.clone())] ); assert_eq!( - bank1.get_program_accounts(&program_id), + bank1.get_program_accounts(&program_id).unwrap(), vec![(pubkey0, account0)] ); assert_eq!( @@ -9196,8 +9221,8 @@ pub(crate) mod tests { let bank3 = Arc::new(new_from_parent(&bank2)); bank3.squash(); - assert_eq!(bank1.get_program_accounts(&program_id).len(), 2); - assert_eq!(bank3.get_program_accounts(&program_id).len(), 2); + assert_eq!(bank1.get_program_accounts(&program_id).unwrap().len(), 2); + assert_eq!(bank3.get_program_accounts(&program_id).unwrap().len(), 2); } #[test] @@ -9217,8 +9242,9 @@ pub(crate) mod tests { let account = AccountSharedData::new(1, 0, &program_id); bank.store_account(&address, &account); - let indexed_accounts = - bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true); + let indexed_accounts = bank + .get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true) + .unwrap(); assert_eq!(indexed_accounts.len(), 1); assert_eq!(indexed_accounts[0], (address, account)); @@ -9229,12 +9255,14 @@ pub(crate) mod tests { let new_account = AccountSharedData::new(1, 0, &another_program_id); let bank = Arc::new(new_from_parent(&bank)); bank.store_account(&address, &new_account); - let indexed_accounts = - bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true); + let indexed_accounts = bank + .get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |_| true) + .unwrap(); assert_eq!(indexed_accounts.len(), 1); assert_eq!(indexed_accounts[0], (address, new_account.clone())); - let indexed_accounts = - bank.get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |_| true); + let indexed_accounts = bank + .get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |_| true) + .unwrap(); assert_eq!(indexed_accounts.len(), 1); assert_eq!(indexed_accounts[0], (address, new_account.clone())); @@ -9242,12 +9270,14 @@ pub(crate) mod tests { let indexed_accounts = bank .get_filtered_indexed_accounts(&IndexKey::ProgramId(program_id), |account| { account.owner() == &program_id - }); + }) + .unwrap(); assert!(indexed_accounts.is_empty()); let indexed_accounts = bank .get_filtered_indexed_accounts(&IndexKey::ProgramId(another_program_id), |account| { account.owner() == &another_program_id - }); + }) + .unwrap(); assert_eq!(indexed_accounts.len(), 1); assert_eq!(indexed_accounts[0], (address, new_account)); } @@ -11895,13 +11925,27 @@ pub(crate) mod tests { assert!(!debug.is_empty()); } + #[derive(Debug)] + enum AcceptableScanResults { + DroppedSlotError, + NoFailure, + Both, + } + fn test_store_scan_consistency( accounts_db_caching_enabled: bool, update_f: F, drop_callback: Option>, + acceptable_scan_results: AcceptableScanResults, ) where - F: Fn(Arc, crossbeam_channel::Sender>, Arc>, Pubkey, u64) - + std::marker::Send, + F: Fn( + Arc, + crossbeam_channel::Sender>, + crossbeam_channel::Receiver, + Arc>, + Pubkey, + u64, + ) + std::marker::Send, { // Set up initial bank let mut genesis_config = create_genesis_config_with_leader( @@ -11946,48 +11990,83 @@ pub(crate) mod tests { // Thread that runs scan and constantly checks for // consistency let pubkeys_to_modify_ = pubkeys_to_modify.clone(); - let exit_ = exit.clone(); // Channel over which the bank to scan is sent let (bank_to_scan_sender, bank_to_scan_receiver): ( crossbeam_channel::Sender>, crossbeam_channel::Receiver>, ) = bounded(1); - let scan_thread = Builder::new() - .name("scan".to_string()) - .spawn(move || loop { - if exit_.load(Relaxed) { - return; - } - if let Ok(bank_to_scan) = - bank_to_scan_receiver.recv_timeout(Duration::from_millis(10)) - { - let accounts = bank_to_scan.get_program_accounts(&program_id); - // Should never see empty accounts because no slot ever deleted - // any of the original accounts, and the scan should reflect the - // account state at some frozen slot `X` (no partial updates). - assert!(!accounts.is_empty()); - let mut expected_lamports = None; - let mut target_accounts_found = HashSet::new(); - for (pubkey, account) in accounts { - let account_balance = account.lamports(); - if pubkeys_to_modify_.contains(&pubkey) { - target_accounts_found.insert(pubkey); - if let Some(expected_lamports) = expected_lamports { - assert_eq!(account_balance, expected_lamports); - } else { - // All pubkeys in the specified set should have the same balance - expected_lamports = Some(account_balance); + + let (scan_finished_sender, scan_finished_receiver): ( + crossbeam_channel::Sender, + crossbeam_channel::Receiver, + ) = unbounded(); + let num_banks_scanned = Arc::new(AtomicU64::new(0)); + let scan_thread = { + let exit = exit.clone(); + let num_banks_scanned = num_banks_scanned.clone(); + Builder::new() + .name("scan".to_string()) + .spawn(move || { + loop { + info!("starting scan iteration"); + if exit.load(Relaxed) { + info!("scan exiting"); + return; + } + if let Ok(bank_to_scan) = + bank_to_scan_receiver.recv_timeout(Duration::from_millis(10)) + { + info!("scanning program accounts for slot {}", bank_to_scan.slot()); + let accounts_result = bank_to_scan.get_program_accounts(&program_id); + let _ = scan_finished_sender.send(bank_to_scan.bank_id()); + num_banks_scanned.fetch_add(1, Relaxed); + match (&acceptable_scan_results, accounts_result.is_err()) { + (AcceptableScanResults::DroppedSlotError, _) + | (AcceptableScanResults::Both, true) => { + assert_eq!( + accounts_result, + Err(ScanError::SlotRemoved { + slot: bank_to_scan.slot(), + bank_id: bank_to_scan.bank_id() + }) + ); + } + (AcceptableScanResults::NoFailure, _) + | (AcceptableScanResults::Both, false) => { + assert!(accounts_result.is_ok()) + } + } + + // Should never see empty accounts because no slot ever deleted + // any of the original accounts, and the scan should reflect the + // account state at some frozen slot `X` (no partial updates). + if let Ok(accounts) = accounts_result { + assert!(!accounts.is_empty()); + let mut expected_lamports = None; + let mut target_accounts_found = HashSet::new(); + for (pubkey, account) in accounts { + let account_balance = account.lamports(); + if pubkeys_to_modify_.contains(&pubkey) { + target_accounts_found.insert(pubkey); + if let Some(expected_lamports) = expected_lamports { + assert_eq!(account_balance, expected_lamports); + } else { + // All pubkeys in the specified set should have the same balance + expected_lamports = Some(account_balance); + } + } + } + + // Should've found all the accounts, i.e. no partial cleans should + // be detected + assert_eq!(target_accounts_found.len(), total_pubkeys_to_modify); } } } - - // Should've found all the accounts, i.e. no partial cleans should - // be detected - assert_eq!(target_accounts_found.len(), total_pubkeys_to_modify); - } - }) - .unwrap(); + }) + .unwrap() + }; // Thread that constantly updates the accounts, sets // roots, and cleans @@ -11997,6 +12076,7 @@ pub(crate) mod tests { update_f( bank0, bank_to_scan_sender, + scan_finished_receiver, pubkeys_to_modify, program_id, starting_lamports, @@ -12005,7 +12085,15 @@ pub(crate) mod tests { .unwrap(); // Let threads run for a while, check the scans didn't see any mixed slots + let min_expected_number_of_scans = 5; std::thread::sleep(Duration::new(5, 0)); + loop { + if num_banks_scanned.load(Relaxed) > min_expected_number_of_scans { + break; + } else { + std::thread::sleep(Duration::from_millis(100)); + } + } exit.store(true, Relaxed); scan_thread.join().unwrap(); update_thread.join().unwrap(); @@ -12023,6 +12111,7 @@ pub(crate) mod tests { *accounts_db_caching_enabled, move |bank0, bank_to_scan_sender, + _scan_finished_receiver, pubkeys_to_modify, program_id, starting_lamports| { @@ -12107,6 +12196,7 @@ pub(crate) mod tests { Some(Box::new(SendDroppedBankCallback::new( pruned_banks_sender.clone(), ))), + AcceptableScanResults::NoFailure, ) } } @@ -12116,7 +12206,12 @@ pub(crate) mod tests { for accounts_db_caching_enabled in &[false, true] { test_store_scan_consistency( *accounts_db_caching_enabled, - |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { + |bank0, + bank_to_scan_sender, + _scan_finished_receiver, + pubkeys_to_modify, + program_id, + starting_lamports| { let mut current_bank = bank0.clone(); let mut prev_bank = bank0; loop { @@ -12151,6 +12246,242 @@ pub(crate) mod tests { } }, None, + AcceptableScanResults::NoFailure, + ); + } + } + + fn setup_banks_on_fork_to_remove( + bank0: Arc, + pubkeys_to_modify: Arc>, + program_id: &Pubkey, + starting_lamports: u64, + num_banks_on_fork: usize, + step_size: usize, + ) -> (Arc, Vec<(Slot, BankId)>, Ancestors) { + // Need at least 2 keys to create inconsistency in account balances when deleting + // slots + assert!(pubkeys_to_modify.len() > 1); + + // Tracks the bank at the tip of the to be created fork + let mut bank_at_fork_tip = bank0; + + // All the slots on the fork except slot 0 + let mut slots_on_fork = Vec::with_capacity(num_banks_on_fork); + + // All accounts in each set of `step_size` slots will have the same account balances. + // The account balances of the accounts changes every `step_size` banks. Thus if you + // delete any one of the latest `step_size` slots, then you will see varying account + // balances when loading the accounts. + assert!(num_banks_on_fork >= 2); + assert!(step_size >= 2); + let pubkeys_to_modify: Vec = pubkeys_to_modify.iter().cloned().collect(); + let pubkeys_to_modify_per_slot = (pubkeys_to_modify.len() / step_size).max(1); + for _ in (0..num_banks_on_fork).step_by(step_size) { + let mut lamports_this_round = 0; + for i in 0..step_size { + bank_at_fork_tip = Arc::new(Bank::new_from_parent( + &bank_at_fork_tip, + &solana_sdk::pubkey::new_rand(), + bank_at_fork_tip.slot() + 1, + )); + if lamports_this_round == 0 { + lamports_this_round = bank_at_fork_tip.bank_id() + starting_lamports + 1; + } + let pubkey_to_modify_starting_index = i * pubkeys_to_modify_per_slot; + let account = AccountSharedData::new(lamports_this_round, 0, program_id); + for pubkey_index_to_modify in pubkey_to_modify_starting_index + ..pubkey_to_modify_starting_index + pubkeys_to_modify_per_slot + { + let key = pubkeys_to_modify[pubkey_index_to_modify % pubkeys_to_modify.len()]; + bank_at_fork_tip.store_account(&key, &account); + } + bank_at_fork_tip.freeze(); + slots_on_fork.push((bank_at_fork_tip.slot(), bank_at_fork_tip.bank_id())); + } + } + + let ancestors: Vec<(Slot, usize)> = slots_on_fork.iter().map(|(s, _)| (*s, 0)).collect(); + let ancestors = Ancestors::from(ancestors); + + (bank_at_fork_tip, slots_on_fork, ancestors) + } + + #[test] + fn test_remove_unrooted_before_scan() { + for accounts_db_caching_enabled in &[false, true] { + test_store_scan_consistency( + *accounts_db_caching_enabled, + |bank0, + bank_to_scan_sender, + scan_finished_receiver, + pubkeys_to_modify, + program_id, + starting_lamports| { + loop { + let (bank_at_fork_tip, slots_on_fork, ancestors) = + setup_banks_on_fork_to_remove( + bank0.clone(), + pubkeys_to_modify.clone(), + &program_id, + starting_lamports, + 10, + 2, + ); + // Test removing the slot before the scan starts, should cause + // SlotRemoved error every time + for k in pubkeys_to_modify.iter() { + assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_some()); + } + bank_at_fork_tip.remove_unrooted_slots(&slots_on_fork); + + // Accounts on this fork should not be found after removal + for k in pubkeys_to_modify.iter() { + assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_none()); + } + if bank_to_scan_sender.send(bank_at_fork_tip.clone()).is_err() { + return; + } + + // Wait for scan to finish before starting next iteration + let finished_scan_bank_id = scan_finished_receiver.recv(); + if finished_scan_bank_id.is_err() { + return; + } + assert_eq!(finished_scan_bank_id.unwrap(), bank_at_fork_tip.bank_id()); + } + }, + None, + // Test removing the slot before the scan starts, should error every time + AcceptableScanResults::DroppedSlotError, + ); + } + } + + #[test] + fn test_remove_unrooted_scan_then_recreate_same_slot_before_scan() { + for accounts_db_caching_enabled in &[false, true] { + test_store_scan_consistency( + *accounts_db_caching_enabled, + |bank0, + bank_to_scan_sender, + scan_finished_receiver, + pubkeys_to_modify, + program_id, + starting_lamports| { + let mut prev_bank = bank0.clone(); + loop { + let start = Instant::now(); + let (bank_at_fork_tip, slots_on_fork, ancestors) = + setup_banks_on_fork_to_remove( + bank0.clone(), + pubkeys_to_modify.clone(), + &program_id, + starting_lamports, + 10, + 2, + ); + info!("setting up banks elapsed: {}", start.elapsed().as_millis()); + // Remove the fork. Then we'll recreate the slots and only after we've + // recreated the slots, do we send this old bank for scanning. + // Skip scanning bank 0 on first iteration of loop, since those accounts + // aren't being removed + if prev_bank.slot() != 0 { + info!( + "sending bank with slot: {:?}, elapsed: {}", + prev_bank.slot(), + start.elapsed().as_millis() + ); + // Although we dumped the slots last iteration via `remove_unrooted_slots()`, + // we've recreated those slots this iteration, so they should be findable + // again + for k in pubkeys_to_modify.iter() { + assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_some()); + } + + // Now after we've recreated the slots removed in the previous loop + // iteration, send the previous bank, should fail even though the + // same slots were recreated + if bank_to_scan_sender.send(prev_bank.clone()).is_err() { + return; + } + + let finished_scan_bank_id = scan_finished_receiver.recv(); + if finished_scan_bank_id.is_err() { + return; + } + // Wait for scan to finish before starting next iteration + assert_eq!(finished_scan_bank_id.unwrap(), prev_bank.bank_id()); + } + bank_at_fork_tip.remove_unrooted_slots(&slots_on_fork); + prev_bank = bank_at_fork_tip; + } + }, + None, + // Test removing the slot before the scan starts, should error every time + AcceptableScanResults::DroppedSlotError, + ); + } + } + + #[test] + fn test_remove_unrooted_scan_interleaved_with_remove_unrooted_slots() { + for accounts_db_caching_enabled in &[false, true] { + test_store_scan_consistency( + *accounts_db_caching_enabled, + |bank0, + bank_to_scan_sender, + scan_finished_receiver, + pubkeys_to_modify, + program_id, + starting_lamports| { + loop { + let step_size = 2; + let (bank_at_fork_tip, slots_on_fork, ancestors) = + setup_banks_on_fork_to_remove( + bank0.clone(), + pubkeys_to_modify.clone(), + &program_id, + starting_lamports, + 10, + step_size, + ); + // Although we dumped the slots last iteration via `remove_unrooted_slots()`, + // we've recreated those slots this iteration, so they should be findable + // again + for k in pubkeys_to_modify.iter() { + assert!(bank_at_fork_tip.load_slow(&ancestors, k).is_some()); + } + + // Now after we've recreated the slots removed in the previous loop + // iteration, send the previous bank, should fail even though the + // same slots were recreated + if bank_to_scan_sender.send(bank_at_fork_tip.clone()).is_err() { + return; + } + + // Remove 1 < `step_size` of the *latest* slots while the scan is happening. + // This should create inconsistency between the account balances of accounts + // stored in that slot, and the accounts stored in earlier slots + let slot_to_remove = *slots_on_fork.last().unwrap(); + bank_at_fork_tip.remove_unrooted_slots(&[slot_to_remove]); + + // Wait for scan to finish before starting next iteration + let finished_scan_bank_id = scan_finished_receiver.recv(); + if finished_scan_bank_id.is_err() { + return; + } + assert_eq!(finished_scan_bank_id.unwrap(), bank_at_fork_tip.bank_id()); + + // Remove the rest of the slots before the next iteration + for (slot, bank_id) in slots_on_fork { + bank_at_fork_tip.remove_unrooted_slots(&[(slot, bank_id)]); + } + } + }, + None, + // Test removing the slot before the scan starts, should error every time + AcceptableScanResults::Both, ); } } @@ -12664,21 +12995,25 @@ pub(crate) mod tests { // Return only one largest account assert_eq!( - bank.get_largest_accounts(1, &pubkeys_hashset, AccountAddressFilter::Include), + bank.get_largest_accounts(1, &pubkeys_hashset, AccountAddressFilter::Include) + .unwrap(), vec![(pubkeys[4], sol_to_lamports(5.0))] ); assert_eq!( - bank.get_largest_accounts(1, &HashSet::new(), AccountAddressFilter::Exclude), + bank.get_largest_accounts(1, &HashSet::new(), AccountAddressFilter::Exclude) + .unwrap(), vec![(pubkeys[4], sol_to_lamports(5.0))] ); assert_eq!( - bank.get_largest_accounts(1, &exclude4, AccountAddressFilter::Exclude), + bank.get_largest_accounts(1, &exclude4, AccountAddressFilter::Exclude) + .unwrap(), vec![(pubkeys[3], sol_to_lamports(4.0))] ); // Return all added accounts - let results = - bank.get_largest_accounts(10, &pubkeys_hashset, AccountAddressFilter::Include); + let results = bank + .get_largest_accounts(10, &pubkeys_hashset, AccountAddressFilter::Include) + .unwrap(); assert_eq!(results.len(), sorted_accounts.len()); for pubkey_balance in sorted_accounts.iter() { assert!(results.contains(pubkey_balance)); @@ -12688,7 +13023,9 @@ pub(crate) mod tests { assert_eq!(sorted_results, results); let expected_accounts = sorted_accounts[1..].to_vec(); - let results = bank.get_largest_accounts(10, &exclude4, AccountAddressFilter::Exclude); + let results = bank + .get_largest_accounts(10, &exclude4, AccountAddressFilter::Exclude) + .unwrap(); // results include 5 Bank builtins assert_eq!(results.len(), 10); for pubkey_balance in expected_accounts.iter() { @@ -12700,14 +13037,18 @@ pub(crate) mod tests { // Return 3 added accounts let expected_accounts = sorted_accounts[0..4].to_vec(); - let results = bank.get_largest_accounts(4, &pubkeys_hashset, AccountAddressFilter::Include); + let results = bank + .get_largest_accounts(4, &pubkeys_hashset, AccountAddressFilter::Include) + .unwrap(); assert_eq!(results.len(), expected_accounts.len()); for pubkey_balance in expected_accounts.iter() { assert!(results.contains(pubkey_balance)); } let expected_accounts = expected_accounts[1..4].to_vec(); - let results = bank.get_largest_accounts(3, &exclude4, AccountAddressFilter::Exclude); + let results = bank + .get_largest_accounts(3, &exclude4, AccountAddressFilter::Exclude) + .unwrap(); assert_eq!(results.len(), expected_accounts.len()); for pubkey_balance in expected_accounts.iter() { assert!(results.contains(pubkey_balance)); @@ -12719,7 +13060,8 @@ pub(crate) mod tests { .cloned() .collect(); assert_eq!( - bank.get_largest_accounts(2, &exclude, AccountAddressFilter::Exclude), + bank.get_largest_accounts(2, &exclude, AccountAddressFilter::Exclude) + .unwrap(), vec![pubkeys_balances[3], pubkeys_balances[1]] ); } diff --git a/runtime/src/non_circulating_supply.rs b/runtime/src/non_circulating_supply.rs index fee9a314d2..76668ac811 100644 --- a/runtime/src/non_circulating_supply.rs +++ b/runtime/src/non_circulating_supply.rs @@ -1,6 +1,6 @@ use { crate::{ - accounts_index::{AccountIndex, IndexKey}, + accounts_index::{AccountIndex, IndexKey, ScanResult}, bank::Bank, }, log::*, @@ -14,7 +14,7 @@ pub struct NonCirculatingSupply { pub accounts: Vec, } -pub fn calculate_non_circulating_supply(bank: &Arc) -> NonCirculatingSupply { +pub fn calculate_non_circulating_supply(bank: &Arc) -> ScanResult { debug!("Updating Bank supply, epoch: {}", bank.epoch()); let mut non_circulating_accounts_set: HashSet = HashSet::new(); @@ -38,10 +38,11 @@ pub fn calculate_non_circulating_supply(bank: &Arc) -> NonCirculatingSuppl // zero-lamport Account::Default() after being wiped and reinitialized in later // updates. We include the redundant filter here to avoid returning these accounts. |account| account.owner() == &solana_stake_program::id(), - ) + )? } else { - bank.get_program_accounts(&solana_stake_program::id()) + bank.get_program_accounts(&solana_stake_program::id())? }; + for (pubkey, account) in stake_accounts.iter() { let stake_account = StakeState::from(account).unwrap_or_default(); match stake_account { @@ -68,10 +69,10 @@ pub fn calculate_non_circulating_supply(bank: &Arc) -> NonCirculatingSuppl .map(|pubkey| bank.get_balance(&pubkey)) .sum(); - NonCirculatingSupply { + Ok(NonCirculatingSupply { lamports, accounts: non_circulating_accounts_set.into_iter().collect(), - } + }) } // Mainnet-beta accounts that should be considered non-circulating @@ -256,7 +257,7 @@ mod tests { + sysvar_and_native_program_delta, ); - let non_circulating_supply = calculate_non_circulating_supply(&bank); + let non_circulating_supply = calculate_non_circulating_supply(&bank).unwrap(); assert_eq!( non_circulating_supply.lamports, (num_non_circulating_accounts + num_stake_accounts) * balance @@ -274,7 +275,7 @@ mod tests { &AccountSharedData::new(new_balance, 0, &Pubkey::default()), ); } - let non_circulating_supply = calculate_non_circulating_supply(&bank); + let non_circulating_supply = calculate_non_circulating_supply(&bank).unwrap(); assert_eq!( non_circulating_supply.lamports, (num_non_circulating_accounts * new_balance) + (num_stake_accounts * balance) @@ -289,7 +290,7 @@ mod tests { bank = Arc::new(new_from_parent(&bank)); } assert_eq!(bank.epoch(), 1); - let non_circulating_supply = calculate_non_circulating_supply(&bank); + let non_circulating_supply = calculate_non_circulating_supply(&bank).unwrap(); assert_eq!( non_circulating_supply.lamports, num_non_circulating_accounts * new_balance diff --git a/sdk/program/src/clock.rs b/sdk/program/src/clock.rs index 51f45fc23a..5cf5b27fc4 100644 --- a/sdk/program/src/clock.rs +++ b/sdk/program/src/clock.rs @@ -77,6 +77,10 @@ pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 6; /// is some some number of Ticks long. pub type Slot = u64; +/// Uniquely distinguishes every version of a slot, even if the +/// slot number is the same, i.e. duplicate slots +pub type BankId = u64; + /// Epoch is a unit of time a given leader schedule is honored, /// some number of Slots. pub type Epoch = u64;