From 791fb17437a95be36db9b1132316c6fc776573c6 Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 20 Nov 2020 13:01:04 -0800 Subject: [PATCH] Prevent scans on unrooted slots from seeing partial clean (#13628) Co-authored-by: Carl Lin --- Cargo.lock | 1 + client/src/thin_client.rs | 12 +- local-cluster/Cargo.toml | 1 + local-cluster/tests/local_cluster.rs | 320 ++++++++++++++++++++++----- runtime/src/accounts_db.rs | 6 +- runtime/src/accounts_index.rs | 130 ++++++++++- runtime/src/bank.rs | 92 +++++++- 7 files changed, 500 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 72a6c52a1..4046a91b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4352,6 +4352,7 @@ name = "solana-local-cluster" version = "1.5.0" dependencies = [ "assert_matches", + "crossbeam-channel", "fs_extra", "gag", "itertools 0.9.0", diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 14dcbec0d..c99a8747b 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -3,7 +3,7 @@ //! messages to the network directly. The binary encoding of its messages are //! unstable and may change in future releases. -use crate::{rpc_client::RpcClient, rpc_response::Response}; +use crate::{rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response}; use bincode::{serialize_into, serialized_size}; use log::*; use solana_sdk::{ @@ -276,6 +276,16 @@ impl ThinClient { ) } + pub fn get_program_accounts_with_config( + &self, + pubkey: &Pubkey, + config: RpcProgramAccountsConfig, + ) -> TransportResult> { + self.rpc_client() + .get_program_accounts_with_config(pubkey, config) + .map_err(|e| e.into()) + } + pub fn wait_for_balance_with_commitment( &self, pubkey: &Pubkey, diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index 2bb655e3a..0403fc30a 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -9,6 +9,7 @@ license = "Apache-2.0" homepage = "https://solana.com/" [dependencies] +crossbeam-channel = "0.4" itertools = "0.9.0" gag = "0.1.10" fs_extra = "1.1.0" diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 6b89fc9e0..2e142b2b2 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1,10 +1,14 @@ use assert_matches::assert_matches; +use crossbeam_channel::{unbounded, Receiver}; use gag::BufferRedirect; use log::*; use serial_test_derive::serial; use solana_client::{ - pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::RpcSignatureResult, - thin_client::create_client, + pubsub_client::PubsubClient, + rpc_client::RpcClient, + rpc_config::RpcProgramAccountsConfig, + rpc_response::RpcSignatureResult, + thin_client::{create_client, ThinClient}, }; use solana_core::{ broadcast_stage::BroadcastStageType, @@ -42,7 +46,7 @@ use solana_sdk::{ poh_config::PohConfig, pubkey::Pubkey, signature::{Keypair, Signer}, - system_transaction, + system_program, system_transaction, }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ @@ -53,7 +57,7 @@ use std::{ path::{Path, PathBuf}, sync::atomic::{AtomicBool, Ordering}, sync::Arc, - thread::sleep, + thread::{sleep, Builder, JoinHandle}, time::Duration, }; use tempfile::TempDir; @@ -301,8 +305,8 @@ fn run_cluster_partition( on_partition_resolved: F, additional_accounts: Vec<(Pubkey, Account)>, ) where - E: Fn(&mut LocalCluster), - F: Fn(&mut LocalCluster), + E: FnOnce(&mut LocalCluster), + F: FnOnce(&mut LocalCluster), { solana_logger::setup(); info!("PARTITION_TEST!"); @@ -454,6 +458,35 @@ fn test_cluster_partition_1_1_1() { ) } +fn create_custom_leader_schedule( + num_validators: usize, + num_slots_per_validator: usize, +) -> (LeaderSchedule, Vec>) { + let mut leader_schedule = vec![]; + let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new())) + .take(num_validators) + .collect(); + for (i, k) in validator_keys.iter().enumerate() { + let num_slots = { + if i == 0 { + // Set up the leader to have 50% of the slots + num_slots_per_validator * (num_validators - 1) + } else { + num_slots_per_validator + } + }; + for _ in 0..num_slots { + leader_schedule.push(k.pubkey()) + } + } + + info!("leader_schedule: {}", leader_schedule.len()); + ( + LeaderSchedule::new_from_schedule(leader_schedule), + validator_keys, + ) +} + #[test] #[serial] fn test_kill_heaviest_partition() { @@ -465,26 +498,10 @@ fn test_kill_heaviest_partition() { // 3) Kills the most staked partition. Validators are locked out, but should all // eventually choose the major partition // 4) Check for recovery - let mut leader_schedule = vec![]; let num_slots_per_validator = 8; let partitions: [&[usize]; 4] = [&[11], &[10], &[10], &[10]]; - let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new())) - .take(partitions.len()) - .collect(); - for (i, k) in validator_keys.iter().enumerate() { - let num_slots = { - if i == 0 { - // Set up the leader to have 50% of the slots - num_slots_per_validator * (partitions.len() - 1) - } else { - num_slots_per_validator - } - }; - for _ in 0..num_slots { - leader_schedule.push(k.pubkey()) - } - } - info!("leader_schedule: {}", leader_schedule.len()); + let (leader_schedule, validator_keys) = + create_custom_leader_schedule(partitions.len(), num_slots_per_validator); let empty = |_: &mut LocalCluster| {}; let validator_to_kill = validator_keys[0].pubkey(); @@ -495,10 +512,7 @@ fn test_kill_heaviest_partition() { }; run_cluster_partition( &partitions, - Some(( - LeaderSchedule::new_from_schedule(leader_schedule), - validator_keys, - )), + Some((leader_schedule, validator_keys)), empty, on_partition_resolved, vec![], @@ -526,30 +540,14 @@ fn run_kill_partition_switch_threshold( // 1) Spins up three partitions // 2) Kills the first partition with the stake `failures_stake` // 5) runs `on_partition_resolved` - let mut leader_schedule = vec![]; let num_slots_per_validator = 8; let partitions: [&[usize]; 3] = [ &[(failures_stake as usize)], &[(alive_stake_1 as usize)], &[(alive_stake_2 as usize)], ]; - let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new())) - .take(partitions.len()) - .collect(); - for (i, k) in validator_keys.iter().enumerate() { - let num_slots = { - if i == 0 { - // Set up the leader to have 50% of the slots - num_slots_per_validator * (partitions.len() - 1) - } else { - num_slots_per_validator - } - }; - for _ in 0..num_slots { - leader_schedule.push(k.pubkey()) - } - } - info!("leader_schedule: {}", leader_schedule.len()); + let (leader_schedule, validator_keys) = + create_custom_leader_schedule(partitions.len(), num_slots_per_validator); let validator_to_kill = validator_keys[0].pubkey(); let on_partition_start = |cluster: &mut LocalCluster| { @@ -558,10 +556,7 @@ fn run_kill_partition_switch_threshold( }; run_cluster_partition( &partitions, - Some(( - LeaderSchedule::new_from_schedule(leader_schedule), - validator_keys, - )), + Some((leader_schedule, validator_keys)), on_partition_start, on_partition_resolved, vec![], @@ -2143,6 +2138,231 @@ fn test_optimistic_confirmation_violation_without_tower() { do_test_optimistic_confirmation_violation_with_or_without_tower(false); } +#[test] +#[serial] +fn test_run_test_load_program_accounts_root() { + run_test_load_program_accounts(CommitmentConfig::root()); +} + +#[test] +#[serial] +fn test_run_test_load_program_accounts_partition_root() { + run_test_load_program_accounts_partition(CommitmentConfig::root()); +} + +fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { + let num_slots_per_validator = 8; + let partitions: [&[usize]; 2] = [&[(1 as usize)], &[(1 as usize)]]; + let (leader_schedule, validator_keys) = + create_custom_leader_schedule(partitions.len(), num_slots_per_validator); + + let (update_client_sender, update_client_receiver) = unbounded(); + let (scan_client_sender, scan_client_receiver) = unbounded(); + let exit = Arc::new(AtomicBool::new(false)); + + let (t_update, t_scan, additional_accounts) = setup_transfer_scan_threads( + 1000, + exit.clone(), + scan_commitment, + update_client_receiver, + scan_client_receiver, + ); + + let on_partition_start = |cluster: &mut LocalCluster| { + let update_client = cluster + .get_validator_client(&cluster.entry_point_info.id) + .unwrap(); + update_client_sender.send(update_client).unwrap(); + let scan_client = cluster + .get_validator_client(&cluster.entry_point_info.id) + .unwrap(); + scan_client_sender.send(scan_client).unwrap(); + }; + + let on_partition_resolved = |cluster: &mut LocalCluster| { + cluster.check_for_new_roots(20, &"run_test_load_program_accounts_partition"); + exit.store(true, Ordering::Relaxed); + t_update.join().unwrap(); + t_scan.join().unwrap(); + }; + + run_cluster_partition( + &partitions, + Some((leader_schedule, validator_keys)), + on_partition_start, + on_partition_resolved, + additional_accounts, + ); +} + +fn setup_transfer_scan_threads( + num_starting_accounts: usize, + exit: Arc, + scan_commitment: CommitmentConfig, + update_client_receiver: Receiver, + scan_client_receiver: Receiver, +) -> (JoinHandle<()>, JoinHandle<()>, Vec<(Pubkey, Account)>) { + let exit_ = exit.clone(); + let starting_keypairs: Arc> = Arc::new( + iter::repeat_with(Keypair::new) + .take(num_starting_accounts) + .collect(), + ); + let target_keypairs: Arc> = Arc::new( + iter::repeat_with(Keypair::new) + .take(num_starting_accounts) + .collect(), + ); + let starting_accounts: Vec<(Pubkey, Account)> = starting_keypairs + .iter() + .map(|k| (k.pubkey(), Account::new(1, 0, &system_program::id()))) + .collect(); + + let starting_keypairs_ = starting_keypairs.clone(); + let target_keypairs_ = target_keypairs.clone(); + let t_update = Builder::new() + .name("update".to_string()) + .spawn(move || { + let client = update_client_receiver.recv().unwrap(); + loop { + if exit_.load(Ordering::Relaxed) { + return; + } + let (blockhash, _fee_calculator, _last_valid_slot) = client + .get_recent_blockhash_with_commitment(CommitmentConfig::recent()) + .unwrap(); + for i in 0..starting_keypairs_.len() { + client + .async_transfer( + 1, + &starting_keypairs_[i], + &target_keypairs_[i].pubkey(), + blockhash, + ) + .unwrap(); + } + for i in 0..starting_keypairs_.len() { + client + .async_transfer( + 1, + &target_keypairs_[i], + &starting_keypairs_[i].pubkey(), + blockhash, + ) + .unwrap(); + } + } + }) + .unwrap(); + + // Scan, the total funds should add up to the original + let mut scan_commitment_config = RpcProgramAccountsConfig::default(); + scan_commitment_config.account_config.commitment = Some(scan_commitment); + let tracked_pubkeys: HashSet = starting_keypairs + .iter() + .chain(target_keypairs.iter()) + .map(|k| k.pubkey()) + .collect(); + let expected_total_balance = num_starting_accounts as u64; + let t_scan = Builder::new() + .name("scan".to_string()) + .spawn(move || { + let client = scan_client_receiver.recv().unwrap(); + loop { + if exit.load(Ordering::Relaxed) { + return; + } + if let Some(total_scan_balance) = client + .get_program_accounts_with_config( + &system_program::id(), + scan_commitment_config.clone(), + ) + .ok() + .map(|result| { + result + .into_iter() + .map(|(key, account)| { + if tracked_pubkeys.contains(&key) { + account.lamports + } else { + 0 + } + }) + .sum::() + }) + { + assert_eq!(total_scan_balance, expected_total_balance); + } + } + }) + .unwrap(); + + (t_update, t_scan, starting_accounts) +} + +fn run_test_load_program_accounts(scan_commitment: CommitmentConfig) { + solana_logger::setup(); + // First set up the cluster with 2 nodes + let slots_per_epoch = 2048; + let node_stakes = vec![51, 50]; + let validator_keys: Vec<_> = vec![ + "4qhhXNTbKD1a5vxDDLZcHKj7ELNeiivtUBxn3wUK1F5VRsQVP89VUhfXqSfgiFB14GfuBgtrQ96n9NvWQADVkcCg", + "3kHBzVwie5vTEaY6nFCPeFT8qDpoXzn7dCEioGRNBTnUDpvwnG85w8Wq63gVWpVTP8k2a8cgcWRjSXyUkEygpXWS", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .take(node_stakes.len()) + .collect(); + + let num_starting_accounts = 1000; + let exit = Arc::new(AtomicBool::new(false)); + let (update_client_sender, update_client_receiver) = unbounded(); + let (scan_client_sender, scan_client_receiver) = unbounded(); + + // Setup the update/scan threads + let (t_update, t_scan, starting_accounts) = setup_transfer_scan_threads( + num_starting_accounts, + exit.clone(), + scan_commitment, + update_client_receiver, + scan_client_receiver, + ); + + let mut config = ClusterConfig { + cluster_lamports: 100_000, + node_stakes: node_stakes.clone(), + validator_configs: vec![ValidatorConfig::default(); node_stakes.len()], + validator_keys: Some(validator_keys), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + additional_accounts: starting_accounts, + ..ClusterConfig::default() + }; + let cluster = LocalCluster::new(&mut config); + + // Give the threads a client to use for querying the cluster + let all_pubkeys = cluster.get_node_pubkeys(); + let other_validator_id = all_pubkeys + .into_iter() + .find(|x| *x != cluster.entry_point_info.id) + .unwrap(); + let client = cluster + .get_validator_client(&cluster.entry_point_info.id) + .unwrap(); + update_client_sender.send(client).unwrap(); + let scan_client = cluster.get_validator_client(&other_validator_id).unwrap(); + scan_client_sender.send(scan_client).unwrap(); + + // Wait for some roots to pass + cluster.check_for_new_roots(40, &"run_test_load_program_accounts"); + + // Exit and ensure no violations of consistency were found + exit.store(true, Ordering::Relaxed); + t_update.join().unwrap(); + t_scan.join().unwrap(); +} + fn wait_for_next_snapshot( cluster: &LocalCluster, snapshot_package_output_path: &Path, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b5cdc478c..dfa168484 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -3043,7 +3043,7 @@ pub mod tests { assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); let accounts: Vec = - db.scan_accounts(&ancestors, |accounts: &mut Vec, option| { + db.unchecked_scan_accounts(&ancestors, |accounts: &mut Vec, option| { if let Some(data) = option { accounts.push(data.1); } @@ -4368,7 +4368,7 @@ pub mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); let accounts: Vec = - db.scan_accounts(&ancestors, |accounts: &mut Vec, option| { + db.unchecked_scan_accounts(&ancestors, |accounts: &mut Vec, option| { if let Some(data) = option { accounts.push(data.1); } @@ -4377,7 +4377,7 @@ pub mod tests { let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); let accounts: Vec = - db.scan_accounts(&ancestors, |accounts: &mut Vec, option| { + db.unchecked_scan_accounts(&ancestors, |accounts: &mut Vec, option| { if let Some(data) = option { accounts.push(data.1); } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 142eb1a4f..a55af96b7 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -220,9 +220,124 @@ impl AccountsIndex { // deadlock let max_root = self.max_root(); *w_ongoing_scan_roots.entry(max_root).or_default() += 1; + max_root }; + // First we show that for any bank `B` that is a descendant of + // the current `max_root`, it must be true that and `B.ancestors.contains(max_root)`, + // regardless of the pattern of `squash()` behavior, `where` `ancestors` is the set + // of ancestors that is tracked in each bank. + // + // Proof: At startup, if starting from a snapshot, generate_index() adds all banks + // in the snapshot to the index via `add_root()` and so `max_root` will be the + // greatest of these. Thus, so the claim holds at startup since there are no + // descendants of `max_root`. + // + // Now we proceed by induction on each `BankForks::set_root()`. + // Assume the claim holds when the `max_root` is `R`. Call the set of + // descendants of `R` present in BankForks `R_descendants`. + // + // Then for any banks `B` in `R_descendants`, it must be that `B.ancestors.contains(S)`, + // where `S` is any ancestor of `B` such that `S >= R`. + // + // For example: + // `R` -> `A` -> `C` -> `B` + // Then `B.ancestors == {R, A, C}` + // + // Next we call `BankForks::set_root()` at some descendant of `R`, `R_new`, + // where `R_new > R`. + // + // When we squash `R_new`, `max_root` in the AccountsIndex here is now set to `R_new`, + // and all nondescendants of `R_new` are pruned. + // + // Now consider any outstanding references to banks in the system that are descended from + // `max_root == R_new`. Take any one of these references and call it `B`. Because `B` is + // a descendant of `R_new`, this means `B` was also a descendant of `R`. Thus `B` + // must be a member of `R_descendants` because `B` was constructed and added to + // BankForks before the `set_root`. + // + // This means by the guarantees of `R_descendants` described above, because + // `R_new` is an ancestor of `B`, and `R < R_new < B`, then B.ancestors.contains(R_new)`. + // + // Now until the next `set_root`, any new banks constructed from `new_from_parent` will + // also have `max_root == R_new` in their ancestor set, so the claim holds for those descendants + // as well. Once the next `set_root` happens, we once again update `max_root` and the same + // inductive argument can be applied again to show the claim holds. + + // Check that the `max_root` is present in `ancestors`. From the proof above, if + // `max_root` is not present in `ancestors`, this means the bank `B` with the + // given `ancestors` is not descended from `max_root, which means + // either: + // 1) `B` is on a different fork or + // 2) `B` is an ancestor of `max_root`. + // In both cases we can ignore the given ancestors and instead just rely on the roots + // present as `max_root` indicates the roots present in the index are more up to date + // than the ancestors given. + let empty = HashMap::new(); + let ancestors = if ancestors.contains_key(&max_root) { + ancestors + } else { + /* + This takes of edge cases like: + + Diagram 1: + + slot 0 + | + slot 1 + / \ + slot 2 | + | slot 3 (max root) + slot 4 (scan) + + By the time the scan on slot 4 is called, slot 2 may already have been + cleaned by a clean on slot 3, but slot 4 may not have been cleaned. + The state in slot 2 would have been purged and is not saved in any roots. + In this case, a scan on slot 4 wouldn't accurately reflect the state when bank 4 + was frozen. In cases like this, we default to a scan on the latest roots by + removing all `ancestors`. + */ + &empty + }; + + /* + Now there are two cases, either `ancestors` is empty or nonempty: + + 1) If ancestors is empty, then this is the same as a scan on a rooted bank, + and `ongoing_scan_roots` provides protection against cleanup of roots necessary + for the scan, and passing `Some(max_root)` to `do_scan_accounts()` ensures newer + roots don't appear in the scan. + + 2) If ancestors is non-empty, then from the `ancestors_contains(&max_root)` above, we know + that the fork structure must look something like: + + Diagram 2: + + Build fork structure: + slot 0 + | + slot 1 (max_root) + / \ + slot 2 | + | slot 3 (potential newer max root) + slot 4 + | + slot 5 (scan) + + Consider both types of ancestors, ancestor <= `max_root` and + ancestor > `max_root`, where `max_root == 1` as illustrated above. + + a) The set of `ancestors <= max_root` are all rooted, which means their state + is protected by the same guarantees as 1). + + b) As for the `ancestors > max_root`, those banks have at least one reference discoverable + through the chain of `Bank::BankRc::parent` starting from the calling bank. For instance + bank 5's parent reference keeps bank 4 alive, which will prevent the `Bank::drop()` from + running and cleaning up bank 4. Furthermore, no cleans can happen past the saved max_root == 1, + so a potential newer max root at 3 will not clean up any of the ancestors > 1, so slot 4 + will not be cleaned in the middle of the scan either. + */ self.do_scan_accounts(ancestors, func, range, Some(max_root)); { let mut ongoing_scan_roots = self.ongoing_scan_roots.write().unwrap(); @@ -246,6 +361,9 @@ impl AccountsIndex { self.do_scan_accounts(ancestors, func, range, None); } + // Scan accounts and return latest version of each account that is either: + // 1) rooted or + // 2) present in ancestors fn do_scan_accounts<'a, F, R>( &'a self, ancestors: &Ancestors, @@ -636,7 +754,7 @@ mod tests { assert!(index.get(&key.pubkey(), None, None).is_none()); let mut num = 0; - index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); + index.unchecked_scan_accounts(&ancestors, |_pubkey, _index| num += 1); assert_eq!(num, 0); } @@ -653,7 +771,7 @@ mod tests { assert!(index.get(&key.pubkey(), None, None).is_none()); let mut num = 0; - index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); + index.unchecked_scan_accounts(&ancestors, |_pubkey, _index| num += 1); assert_eq!(num, 0); } @@ -669,7 +787,7 @@ mod tests { assert!(index.get(&key.pubkey(), Some(&ancestors), None).is_none()); let mut num = 0; - index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); + index.unchecked_scan_accounts(&ancestors, |_pubkey, _index| num += 1); assert_eq!(num, 0); } @@ -687,7 +805,7 @@ mod tests { let mut num = 0; let mut found_key = false; - index.scan_accounts(&ancestors, |pubkey, _index| { + index.unchecked_scan_accounts(&ancestors, |pubkey, _index| { if pubkey == &key.pubkey() { found_key = true }; @@ -813,7 +931,7 @@ mod tests { let ancestors: Ancestors = HashMap::new(); let mut scanned_keys = HashSet::new(); - index.scan_accounts(&ancestors, |pubkey, _index| { + index.unchecked_scan_accounts(&ancestors, |pubkey, _index| { scanned_keys.insert(*pubkey); }); assert_eq!(scanned_keys.len(), num_pubkeys); @@ -1011,7 +1129,7 @@ mod tests { let mut num = 0; let mut found_key = false; - index.scan_accounts(&Ancestors::new(), |pubkey, _index| { + index.unchecked_scan_accounts(&Ancestors::new(), |pubkey, _index| { if pubkey == &key.pubkey() { found_key = true; assert_eq!(_index, (&true, 3)); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 757d0f4e2..b8f821f35 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1775,7 +1775,6 @@ impl Bank { //this bank and all its parents are now on the rooted path let mut roots = vec![self.slot()]; roots.append(&mut self.parents().iter().map(|p| p.slot()).collect()); - *self.rc.parent.write().unwrap() = None; let mut squash_accounts_time = Measure::start("squash_accounts_time"); for slot in roots.iter().rev() { @@ -1784,6 +1783,8 @@ impl Bank { } squash_accounts_time.stop(); + *self.rc.parent.write().unwrap() = None; + let mut squash_cache_time = Measure::start("squash_cache_time"); roots .iter() @@ -3506,6 +3507,13 @@ impl Bank { parents } + /// Compute all the parents of the bank including this bank itself + pub fn parents_inclusive(self: &Arc) -> Vec> { + let mut all = vec![self.clone()]; + all.extend(self.parents().into_iter()); + all + } + pub fn store_account(&self, pubkey: &Pubkey, account: &Account) { self.rc.accounts.store_slow(self.slot(), pubkey, account); @@ -3686,7 +3694,7 @@ impl Bank { } pub fn get_largest_accounts( - &self, + self: &Arc, num: usize, filter_by_address: &HashSet, filter: AccountAddressFilter, @@ -10670,6 +10678,86 @@ pub(crate) mod tests { update_thread.join().unwrap(); } + #[test] + fn test_store_scan_consistency_unrooted() { + test_store_scan_consistency( + |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { + let mut current_major_fork_bank = bank0; + loop { + let mut current_minor_fork_bank = current_major_fork_bank.clone(); + let num_new_banks = 2; + let lamports = current_minor_fork_bank.slot() + starting_lamports + 1; + // Modify banks on the two banks on the minor fork + for pubkeys_to_modify in &pubkeys_to_modify + .iter() + .chunks(pubkeys_to_modify.len() / num_new_banks) + { + current_minor_fork_bank = Arc::new(Bank::new_from_parent( + ¤t_minor_fork_bank, + &solana_sdk::pubkey::new_rand(), + current_minor_fork_bank.slot() + 2, + )); + let account = Account::new(lamports, 0, &program_id); + // Write partial updates to each of the banks in the minor fork so if any of them + // get cleaned up, there will be keys with the wrong account value/missing. + for key in pubkeys_to_modify { + current_minor_fork_bank.store_account(key, &account); + } + current_minor_fork_bank.freeze(); + } + + // All the parent banks made in this iteration of the loop + // are currently discoverable, previous parents should have + // been squashed + assert_eq!( + current_minor_fork_bank.parents_inclusive().len(), + num_new_banks + 1, + ); + + // `next_major_bank` needs to be sandwiched between the minor fork banks + // That way, after the squash(), the minor fork has the potential to see a + // *partial* clean of the banks < `next_major_bank`. + current_major_fork_bank = Arc::new(Bank::new_from_parent( + ¤t_major_fork_bank, + &solana_sdk::pubkey::new_rand(), + current_minor_fork_bank.slot() - 1, + )); + let lamports = current_major_fork_bank.slot() + starting_lamports + 1; + let account = Account::new(lamports, 0, &program_id); + for key in pubkeys_to_modify.iter() { + // Store rooted updates to these pubkeys such that the minor + // fork updates to the same keys will be deleted by clean + current_major_fork_bank.store_account(key, &account); + } + + // Send the last new bank to the scan thread to perform the scan. + // Meanwhile this thread will continually set roots on a separate fork + // and squash. + /* + bank 0 + / \ + minor bank 1 \ + / current_major_fork_bank + minor bank 2 + + */ + // The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting + // the next iteration, allowing the scan to stay in sync with these updates + // such that every scan will see this interruption. + current_major_fork_bank.freeze(); + current_major_fork_bank.squash(); + if bank_to_scan_sender.send(current_minor_fork_bank).is_err() { + // Channel was disconnected, exit + return; + } + + // Try to get clean to overlap with the scan + current_major_fork_bank.clean_accounts(false); + } + }, + ) + } + #[test] fn test_store_scan_consistency_root() { test_store_scan_consistency(