From 5a963529a83eee6d433568e4f78b5aea379b62ec Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 21 Oct 2023 15:56:43 +0900 Subject: [PATCH] Add BankWithScheduler for upcoming scheduler code (#33704) * Add BankWithScheduler for upcoming scheduler code * Remove too confusing insert_without_scheduler() * Add doc comment as a bonus * Simplify BankForks::banks() * Add derive(Debug) on BankWithScheduler --- core/benches/banking_stage.rs | 4 +- core/src/replay_stage.rs | 29 +++++-- core/tests/epoch_accounts_hash.rs | 52 ++++++----- ledger/src/blockstore_processor.rs | 110 +++++++++++++++--------- poh/src/poh_recorder.rs | 13 ++- program-test/src/lib.rs | 16 ++-- rpc/src/rpc.rs | 13 ++- runtime/src/bank.rs | 5 ++ runtime/src/bank_forks.rs | 48 +++++++---- runtime/src/installed_scheduler_pool.rs | 84 ++++++++++++++++++ runtime/src/lib.rs | 1 + 11 files changed, 270 insertions(+), 105 deletions(-) create mode 100644 runtime/src/installed_scheduler_pool.rs diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 2526c2a63..64300c274 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -37,7 +37,8 @@ use { }, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_runtime::{ - bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + bank::Bank, bank_forks::BankForks, installed_scheduler_pool::BankWithScheduler, + prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ genesis_config::GenesisConfig, @@ -398,6 +399,7 @@ fn simulate_process_entries( num_accounts: usize, ) { let bank = Arc::new(Bank::new_for_benches(genesis_config)); + let bank = BankWithScheduler::new_without_scheduler(bank); for i in 0..(num_accounts / 2) { bank.transfer(initial_lamports, mint_keypair, &keypairs[i * 2].pubkey()) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 2e9aba1dd..40483babb 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -61,6 +61,7 @@ use { bank::{bank_hash_details, Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, + installed_scheduler_pool::BankWithScheduler, prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::{ @@ -1988,7 +1989,7 @@ impl ReplayStage { #[allow(clippy::too_many_arguments)] fn replay_blockstore_into_bank( - bank: &Arc, + bank: &BankWithScheduler, blockstore: &Blockstore, replay_stats: &RwLock, replay_progress: &RwLock, @@ -2599,7 +2600,11 @@ impl ReplayStage { return replay_result; } - let bank = bank_forks.read().unwrap().get(bank_slot).unwrap(); + let bank = bank_forks + .read() + .unwrap() + .get_with_scheduler(bank_slot) + .unwrap(); let parent_slot = bank.parent_slot(); let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { let stats = progress_lock @@ -2687,7 +2692,11 @@ impl ReplayStage { debug!("bank_slot {:?} is marked dead", bank_slot); replay_result.is_slot_dead = true; } else { - let bank = bank_forks.read().unwrap().get(bank_slot).unwrap(); + let bank = bank_forks + .read() + .unwrap() + .get_with_scheduler(bank_slot) + .unwrap(); let parent_slot = bank.parent_slot(); let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank); let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { @@ -2768,7 +2777,11 @@ impl ReplayStage { } let bank_slot = replay_result.bank_slot; - let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); + let bank = &bank_forks + .read() + .unwrap() + .get_with_scheduler(bank_slot) + .unwrap(); if let Some(replay_result) = &replay_result.replay_result { match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, @@ -2826,7 +2839,9 @@ impl ReplayStage { ); // report cost tracker stats cost_update_sender - .send(CostUpdate::FrozenBank { bank: bank.clone() }) + .send(CostUpdate::FrozenBank { + bank: bank.clone_without_scheduler(), + }) .unwrap_or_else(|err| { warn!("cost_update_sender failed sending bank stats: {:?}", err) }); @@ -2887,7 +2902,7 @@ impl ReplayStage { if let Some(sender) = bank_notification_sender { sender .sender - .send(BankNotification::Frozen(bank.clone())) + .send(BankNotification::Frozen(bank.clone_without_scheduler())) .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } blockstore_processor::cache_block_meta(bank, cache_block_meta_sender); @@ -4747,7 +4762,7 @@ pub(crate) mod tests { assert_eq!(bank0.tick_height(), bank0.max_tick_height()); let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); - let bank1 = bank_forks.read().unwrap().get(1).unwrap(); + let bank1 = bank_forks.read().unwrap().get_with_scheduler(1).unwrap(); let bank1_progress = progress .entry(bank1.slot()) .or_insert_with(|| ForkProgress::new(bank1.last_blockhash(), None, None, 0, 0)); diff --git a/core/tests/epoch_accounts_hash.rs b/core/tests/epoch_accounts_hash.rs index 8fa6919e9..3b759a064 100755 --- a/core/tests/epoch_accounts_hash.rs +++ b/core/tests/epoch_accounts_hash.rs @@ -599,18 +599,22 @@ fn test_epoch_accounts_hash_and_warping() { ); // flush the write cache so warping can calculate the accounts hash from storages bank.force_flush_accounts_cache(); - let bank = bank_forks.write().unwrap().insert(Bank::warp_from_parent( - bank, - &Pubkey::default(), - eah_stop_slot_in_next_epoch, - CalcAccountsHashDataSource::Storages, - )); + let bank = bank_forks + .write() + .unwrap() + .insert(Bank::warp_from_parent( + bank, + &Pubkey::default(), + eah_stop_slot_in_next_epoch, + CalcAccountsHashDataSource::Storages, + )) + .clone_without_scheduler(); let slot = bank.slot().checked_add(1).unwrap(); - let bank = - bank_forks - .write() - .unwrap() - .insert(Bank::new_from_parent(bank, &Pubkey::default(), slot)); + let bank = bank_forks + .write() + .unwrap() + .insert(Bank::new_from_parent(bank, &Pubkey::default(), slot)) + .clone_without_scheduler(); bank_forks.write().unwrap().set_root( bank.slot(), &test_environment @@ -634,18 +638,22 @@ fn test_epoch_accounts_hash_and_warping() { epoch_schedule.get_first_slot_in_epoch(bank.epoch() + 1) + eah_start_offset; // flush the write cache so warping can calculate the accounts hash from storages bank.force_flush_accounts_cache(); - let bank = bank_forks.write().unwrap().insert(Bank::warp_from_parent( - bank, - &Pubkey::default(), - eah_start_slot_in_next_epoch, - CalcAccountsHashDataSource::Storages, - )); + let bank = bank_forks + .write() + .unwrap() + .insert(Bank::warp_from_parent( + bank, + &Pubkey::default(), + eah_start_slot_in_next_epoch, + CalcAccountsHashDataSource::Storages, + )) + .clone_without_scheduler(); let slot = bank.slot().checked_add(1).unwrap(); - let bank = - bank_forks - .write() - .unwrap() - .insert(Bank::new_from_parent(bank, &Pubkey::default(), slot)); + let bank = bank_forks + .write() + .unwrap() + .insert(Bank::new_from_parent(bank, &Pubkey::default(), slot)) + .clone_without_scheduler(); bank_forks.write().unwrap().set_root( bank.slot(), &test_environment diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index e17bc52a8..d89ee2758 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -39,6 +39,7 @@ use { bank_forks::BankForks, bank_utils, commitment::VOTE_THRESHOLD_SIZE, + installed_scheduler_pool::BankWithScheduler, prioritization_fee_cache::PrioritizationFeeCache, runtime_config::RuntimeConfig, transaction_batch::TransactionBatch, @@ -418,13 +419,13 @@ fn execute_batches( /// This method is for use testing against a single Bank, and assumes `Bank::transaction_count()` /// represents the number of transactions executed in this Bank pub fn process_entries_for_tests( - bank: &Arc, + bank: &BankWithScheduler, entries: Vec, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, ) -> Result<()> { let verify_transaction = { - let bank = bank.clone(); + let bank = bank.clone_with_scheduler(); move |versioned_tx: VersionedTransaction| -> Result { bank.verify_transaction(versioned_tx, TransactionVerificationMode::FullVerification) } @@ -463,7 +464,7 @@ pub fn process_entries_for_tests( } fn process_entries( - bank: &Arc, + bank: &BankWithScheduler, entries: &mut [ReplayEntry], transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -715,11 +716,16 @@ pub(crate) fn process_blockstore_for_bank_0( accounts_update_notifier, exit, ); + let bank0_slot = bank0.slot(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); info!("Processing ledger for slot 0..."); process_bank_0( - &bank_forks.read().unwrap().root_bank(), + &bank_forks + .read() + .unwrap() + .get_with_scheduler(bank0_slot) + .unwrap(), blockstore, opts, &VerifyRecyclers::default(), @@ -889,7 +895,7 @@ fn verify_ticks( fn confirm_full_slot( blockstore: &Blockstore, - bank: &Arc, + bank: &BankWithScheduler, opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, @@ -1050,7 +1056,7 @@ impl ConfirmationProgress { #[allow(clippy::too_many_arguments)] pub fn confirm_slot( blockstore: &Blockstore, - bank: &Arc, + bank: &BankWithScheduler, timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, skip_verification: bool, @@ -1095,7 +1101,7 @@ pub fn confirm_slot( #[allow(clippy::too_many_arguments)] fn confirm_slot_entries( - bank: &Arc, + bank: &BankWithScheduler, slot_entries_load_result: (Vec, u64, bool), timing: &mut ConfirmationTiming, progress: &mut ConfirmationProgress, @@ -1192,7 +1198,7 @@ fn confirm_slot_entries( }; let verify_transaction = { - let bank = bank.clone(); + let bank = bank.clone_with_scheduler(); move |versioned_tx: VersionedTransaction, verification_mode: TransactionVerificationMode| -> Result { @@ -1291,7 +1297,7 @@ fn confirm_slot_entries( // Special handling required for processing the entries in slot 0 fn process_bank_0( - bank0: &Arc, + bank0: &BankWithScheduler, blockstore: &Blockstore, opts: &ProcessOptions, recyclers: &VerifyRecyclers, @@ -1490,7 +1496,7 @@ fn load_frozen_forks( // Block must be frozen by this point; otherwise, // process_single_slot() would have errored above. assert!(bank.is_frozen()); - all_banks.insert(bank.slot(), bank.clone()); + all_banks.insert(bank.slot(), bank.clone_with_scheduler()); m.stop(); process_single_slot_us += m.as_us(); @@ -1520,7 +1526,7 @@ fn load_frozen_forks( // Ensure cluster-confirmed root and parents are set as root in blockstore let mut rooted_slots = vec![]; - let mut new_root_bank = cluster_root_bank.clone(); + let mut new_root_bank = cluster_root_bank.clone_without_scheduler(); loop { if new_root_bank.slot() == root { break; } // Found the last root in the chain, yay! assert!(new_root_bank.slot() > root); @@ -1675,7 +1681,7 @@ fn supermajority_root_from_vote_accounts( #[allow(clippy::too_many_arguments)] fn process_single_slot( blockstore: &Blockstore, - bank: &Arc, + bank: &BankWithScheduler, opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, @@ -1907,6 +1913,18 @@ pub mod tests { } } + fn process_entries_for_tests_without_scheduler( + bank: &Arc, + entries: Vec, + ) -> Result<()> { + process_entries_for_tests( + &BankWithScheduler::new_without_scheduler(bank.clone()), + entries, + None, + None, + ) + } + #[test] fn test_process_blockstore_with_missing_hashes() { do_test_process_blockstore_with_missing_hashes(AccessType::Primary); @@ -2602,7 +2620,7 @@ pub mod tests { ); // Now ensure the TX is accepted despite pointing to the ID of an empty entry. - process_entries_for_tests(&bank, slot_entries, None, None).unwrap(); + process_entries_for_tests_without_scheduler(&bank, slot_entries).unwrap(); assert_eq!(bank.process_transaction(&tx), Ok(())); } @@ -2737,7 +2755,7 @@ pub mod tests { assert_eq!(bank.tick_height(), 0); let tick = next_entry(&genesis_config.hash(), 1, vec![]); assert_eq!( - process_entries_for_tests(&bank, vec![tick], None, None), + process_entries_for_tests_without_scheduler(&bank, vec![tick]), Ok(()) ); assert_eq!(bank.tick_height(), 1); @@ -2772,7 +2790,7 @@ pub mod tests { ); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests(&bank, vec![entry_1, entry_2], None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_1, entry_2]), Ok(()) ); assert_eq!(bank.get_balance(&keypair1.pubkey()), 2); @@ -2828,11 +2846,9 @@ pub mod tests { ); assert_eq!( - process_entries_for_tests( + process_entries_for_tests_without_scheduler( &bank, vec![entry_1_to_mint, entry_2_to_3_mint_to_1], - None, - None, ), Ok(()) ); @@ -2899,11 +2915,9 @@ pub mod tests { ], ); - assert!(process_entries_for_tests( + assert!(process_entries_for_tests_without_scheduler( &bank, vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], - None, - None, ) .is_err()); @@ -3017,7 +3031,7 @@ pub mod tests { let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]); let bank = Arc::new(bank); - let result = process_entries_for_tests(&bank, vec![entry], None, None); + let result = process_entries_for_tests_without_scheduler(&bank, vec![entry]); bank.freeze(); let blockhash_ok = bank.last_blockhash(); let bankhash_ok = bank.hash(); @@ -3058,7 +3072,7 @@ pub mod tests { let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]); let bank = Arc::new(bank); - let _result = process_entries_for_tests(&bank, vec![entry], None, None); + let _result = process_entries_for_tests_without_scheduler(&bank, vec![entry]); bank.freeze(); assert_eq!(blockhash_ok, bank.last_blockhash()); @@ -3150,15 +3164,13 @@ pub mod tests { // keypair2=3 // keypair3=3 - assert!(process_entries_for_tests( + assert!(process_entries_for_tests_without_scheduler( &bank, vec![ entry_1_to_mint, entry_2_to_3_and_1_to_mint, entry_conflict_itself, ], - None, - None, ) .is_err()); @@ -3206,7 +3218,7 @@ pub mod tests { system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash()); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests(&bank, vec![entry_1, entry_2], None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_1, entry_2]), Ok(()) ); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); @@ -3267,7 +3279,7 @@ pub mod tests { }) .collect(); assert_eq!( - process_entries_for_tests(&bank, entries, None, None), + process_entries_for_tests_without_scheduler(&bank, entries), Ok(()) ); } @@ -3330,7 +3342,7 @@ pub mod tests { // Transfer lamports to each other let entry = next_entry(&bank.last_blockhash(), 1, tx_vector); assert_eq!( - process_entries_for_tests(&bank, vec![entry], None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry]), Ok(()) ); bank.squash(); @@ -3390,7 +3402,10 @@ pub mod tests { system_transaction::transfer(&keypair1, &keypair4.pubkey(), 1, bank.last_blockhash()); let entry_2 = next_entry(&tick.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests(&bank, vec![entry_1, tick, entry_2.clone()], None, None,), + process_entries_for_tests_without_scheduler( + &bank, + vec![entry_1, tick, entry_2.clone()], + ), Ok(()) ); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); @@ -3401,7 +3416,7 @@ pub mod tests { system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash()); let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]); assert_eq!( - process_entries_for_tests(&bank, vec![entry_3], None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_3]), Err(TransactionError::AccountNotFound) ); } @@ -3481,7 +3496,7 @@ pub mod tests { ); assert_eq!( - process_entries_for_tests(&bank, vec![entry_1_to_mint], None, None), + process_entries_for_tests_without_scheduler(&bank, vec![entry_1_to_mint]), Err(TransactionError::AccountInUse) ); @@ -3560,7 +3575,7 @@ pub mod tests { // Set up bank1 let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config)); - let bank0 = bank_forks.get(0).unwrap(); + let bank0 = bank_forks.get_with_scheduler(0).unwrap(); let opts = ProcessOptions { run_verification: true, accounts_db_test_hash_calculation: true, @@ -3569,7 +3584,11 @@ pub mod tests { let recyclers = VerifyRecyclers::default(); process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None); let bank0_last_blockhash = bank0.last_blockhash(); - let bank1 = bank_forks.insert(Bank::new_from_parent(bank0, &Pubkey::default(), 1)); + let bank1 = bank_forks.insert(Bank::new_from_parent( + bank0.clone_without_scheduler(), + &Pubkey::default(), + 1, + )); confirm_full_slot( &blockstore, &bank1, @@ -3684,7 +3703,7 @@ pub mod tests { }) .collect(); info!("paying iteration {}", i); - process_entries_for_tests(&bank, entries, None, None).expect("paying failed"); + process_entries_for_tests_without_scheduler(&bank, entries).expect("paying failed"); let entries: Vec<_> = (0..NUM_TRANSFERS) .step_by(NUM_TRANSFERS_PER_ENTRY) @@ -3707,16 +3726,14 @@ pub mod tests { .collect(); info!("refunding iteration {}", i); - process_entries_for_tests(&bank, entries, None, None).expect("refunding failed"); + process_entries_for_tests_without_scheduler(&bank, entries).expect("refunding failed"); // advance to next block - process_entries_for_tests( + process_entries_for_tests_without_scheduler( &bank, (0..bank.ticks_per_slot()) .map(|_| next_entry_mut(&mut hash, 1, vec![])) .collect::>(), - None, - None, ) .expect("process ticks failed"); @@ -3756,7 +3773,7 @@ pub mod tests { let entry = next_entry(&new_blockhash, 1, vec![tx]); entries.push(entry); - process_entries_for_tests(&bank0, entries, None, None).unwrap(); + process_entries_for_tests_without_scheduler(&bank0, entries).unwrap(); assert_eq!(bank0.get_balance(&keypair.pubkey()), 1) } @@ -3922,7 +3939,12 @@ pub mod tests { .collect(); let entry = next_entry(&bank_1_blockhash, 1, vote_txs); let (replay_vote_sender, replay_vote_receiver) = crossbeam_channel::unbounded(); - let _ = process_entries_for_tests(&bank1, vec![entry], None, Some(&replay_vote_sender)); + let _ = process_entries_for_tests( + &BankWithScheduler::new_without_scheduler(bank1), + vec![entry], + None, + Some(&replay_vote_sender), + ); let successes: BTreeSet = replay_vote_receiver .try_iter() .map(|(vote_pubkey, ..)| vote_pubkey) @@ -4210,7 +4232,7 @@ pub mod tests { prev_entry_hash: Hash, ) -> result::Result<(), BlockstoreProcessorError> { confirm_slot_entries( - bank, + &BankWithScheduler::new_without_scheduler(bank.clone()), (slot_entries, 0, slot_full), &mut ConfirmationTiming::default(), &mut ConfirmationProgress::new(prev_entry_hash), @@ -4232,7 +4254,9 @@ pub mod tests { .. } = create_genesis_config(100 * LAMPORTS_PER_SOL); let genesis_hash = genesis_config.hash(); - let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let bank = BankWithScheduler::new_without_scheduler(Arc::new(Bank::new_for_tests( + &genesis_config, + ))); let mut timing = ConfirmationTiming::default(); let mut progress = ConfirmationProgress::new(genesis_hash); let amount = genesis_config.rent.minimum_balance(0); diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index bb14042cb..817c7548b 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -25,7 +25,7 @@ use { }, solana_measure::{measure, measure_us}, solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, - solana_runtime::bank::Bank, + solana_runtime::{bank::Bank, installed_scheduler_pool::BankWithScheduler}, solana_sdk::{ clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, hash::Hash, @@ -264,9 +264,8 @@ impl PohRecorderBank { } } -#[derive(Clone)] pub struct WorkingBank { - pub bank: Arc, + pub bank: BankWithScheduler, pub start: Arc, pub min_tick_height: u64, pub max_tick_height: u64, @@ -596,7 +595,7 @@ impl PohRecorder { self.leader_last_tick_height = leader_last_tick_height; } - pub fn set_bank(&mut self, bank: Arc, track_transaction_indexes: bool) { + pub fn set_bank(&mut self, bank: BankWithScheduler, track_transaction_indexes: bool) { assert!(self.working_bank.is_none()); self.leader_bank_notifier.set_in_progress(&bank); let working_bank = WorkingBank { @@ -644,12 +643,12 @@ impl PohRecorder { #[cfg(feature = "dev-context-only-utils")] pub fn set_bank_for_test(&mut self, bank: Arc) { - self.set_bank(bank, false) + self.set_bank(BankWithScheduler::new_without_scheduler(bank), false) } #[cfg(test)] pub fn set_bank_with_transaction_index_for_test(&mut self, bank: Arc) { - self.set_bank(bank, true) + self.set_bank(BankWithScheduler::new_without_scheduler(bank), true) } // Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height @@ -1092,7 +1091,7 @@ pub fn create_test_recorder( ); let ticks_per_slot = bank.ticks_per_slot(); - poh_recorder.set_bank(bank, false); + poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_service = PohService::new( poh_recorder.clone(), diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index 95b9e6103..5da053486 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -1129,13 +1129,15 @@ impl ProgramTestContext { bank.freeze(); bank } else { - bank_forks.insert(Bank::warp_from_parent( - bank, - &Pubkey::default(), - pre_warp_slot, - // some warping tests cannot use the append vecs because of the sequence of adding roots and flushing - solana_accounts_db::accounts_db::CalcAccountsHashDataSource::IndexForTests, - )) + bank_forks + .insert(Bank::warp_from_parent( + bank, + &Pubkey::default(), + pre_warp_slot, + // some warping tests cannot use the append vecs because of the sequence of adding roots and flushing + solana_accounts_db::accounts_db::CalcAccountsHashDataSource::IndexForTests, + )) + .clone_without_scheduler() }; let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 709c18688..1a8cc045f 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -50,6 +50,7 @@ use { bank::{Bank, TransactionSimulationResult}, bank_forks::BankForks, commitment::{BlockCommitmentArray, BlockCommitmentCache, CommitmentSlots}, + installed_scheduler_pool::BankWithScheduler, non_circulating_supply::calculate_non_circulating_supply, prioritization_fee_cache::PrioritizationFeeCache, snapshot_config::SnapshotConfig, @@ -4602,7 +4603,7 @@ pub fn populate_blockstore_for_tests( // that they are matched properly by get_rooted_block assert_eq!( solana_ledger::blockstore_processor::process_entries_for_tests( - &bank, + &BankWithScheduler::new_without_scheduler(bank), entries, Some( &solana_ledger::blockstore_processor::TransactionStatusSender { @@ -4962,7 +4963,12 @@ pub mod tests { for (i, root) in roots.iter().enumerate() { let new_bank = Bank::new_from_parent(parent_bank.clone(), parent_bank.collector_id(), *root); - parent_bank = self.bank_forks.write().unwrap().insert(new_bank); + parent_bank = self + .bank_forks + .write() + .unwrap() + .insert(new_bank) + .clone_without_scheduler(); let parent = if i > 0 { roots[i - 1] } else { 0 }; fill_blockstore_slot_with_ticks( &self.blockstore, @@ -5004,7 +5010,8 @@ pub mod tests { .bank_forks .write() .unwrap() - .insert(Bank::new_from_parent(parent_bank, &Pubkey::default(), slot)); + .insert(Bank::new_from_parent(parent_bank, &Pubkey::default(), slot)) + .clone_without_scheduler(); let new_block_commitment = BlockCommitmentCache::new( HashMap::new(), diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 8402c2f05..bd0786a9b 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -8513,11 +8513,16 @@ impl Drop for Bank { pub mod test_utils { use { super::Bank, + crate::installed_scheduler_pool::BankWithScheduler, solana_sdk::{hash::hashv, pubkey::Pubkey}, solana_vote_program::vote_state::{self, BlockTimestamp, VoteStateVersions}, std::sync::Arc, }; pub fn goto_end_of_slot(bank: Arc) { + goto_end_of_slot_with_scheduler(&BankWithScheduler::new_without_scheduler(bank)) + } + + pub fn goto_end_of_slot_with_scheduler(bank: &BankWithScheduler) { let mut tick_hash = bank.last_blockhash(); loop { tick_hash = hashv(&[tick_hash.as_ref(), &[42]]); diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 27abe8006..71315bc4b 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -4,6 +4,7 @@ use { crate::{ accounts_background_service::{AbsRequestSender, SnapshotRequest, SnapshotRequestKind}, bank::{epoch_accounts_hash_utils, Bank, SquashTiming}, + installed_scheduler_pool::BankWithScheduler, snapshot_config::SnapshotConfig, }, log::*, @@ -57,7 +58,7 @@ struct SetRootTimings { #[derive(Debug)] pub struct BankForks { - banks: HashMap>, + banks: HashMap, descendants: HashMap>, root: Arc, @@ -82,8 +83,8 @@ impl BankForks { Self::new_from_banks(&[Arc::new(bank)], root) } - pub fn banks(&self) -> HashMap> { - self.banks.clone() + pub fn banks(&self) -> &HashMap { + &self.banks } pub fn get_vote_only_mode_signal(&self) -> Arc { @@ -119,7 +120,7 @@ impl BankForks { self.banks .iter() .filter(|(_, b)| b.is_frozen()) - .map(|(k, b)| (*k, b.clone())) + .map(|(&k, b)| (k, b.clone_without_scheduler())) .collect() } @@ -131,8 +132,13 @@ impl BankForks { .collect() } + pub fn get_with_scheduler(&self, bank_slot: Slot) -> Option { + self.banks.get(&bank_slot).map(|b| b.clone_with_scheduler()) + } + pub fn get(&self, bank_slot: Slot) -> Option> { - self.banks.get(&bank_slot).cloned() + self.get_with_scheduler(bank_slot) + .map(|b| b.clone_without_scheduler()) } pub fn get_with_checked_hash( @@ -159,10 +165,19 @@ impl BankForks { // Iterate through the heads of all the different forks for bank in initial_forks { - banks.insert(bank.slot(), bank.clone()); + banks.insert( + bank.slot(), + BankWithScheduler::new_without_scheduler(bank.clone()), + ); let parents = bank.parents(); for parent in parents { - if banks.insert(parent.slot(), parent.clone()).is_some() { + if banks + .insert( + parent.slot(), + BankWithScheduler::new_without_scheduler(parent.clone()), + ) + .is_some() + { // All ancestors have already been inserted by another fork break; } @@ -187,12 +202,12 @@ impl BankForks { } } - pub fn insert(&mut self, mut bank: Bank) -> Arc { + pub fn insert(&mut self, mut bank: Bank) -> BankWithScheduler { bank.check_program_modification_slot = self.root.load(Ordering::Relaxed) < self.highest_slot_at_startup; - let bank = Arc::new(bank); - let prev = self.banks.insert(bank.slot(), bank.clone()); + let bank = BankWithScheduler::new_without_scheduler(Arc::new(bank)); + let prev = self.banks.insert(bank.slot(), bank.clone_with_scheduler()); assert!(prev.is_none()); let slot = bank.slot(); self.descendants.entry(slot).or_default(); @@ -202,7 +217,7 @@ impl BankForks { bank } - pub fn insert_from_ledger(&mut self, bank: Bank) -> Arc { + pub fn insert_from_ledger(&mut self, bank: Bank) -> BankWithScheduler { self.highest_slot_at_startup = std::cmp::max(self.highest_slot_at_startup, bank.slot()); self.insert(bank) } @@ -224,7 +239,7 @@ impl BankForks { if entry.get().is_empty() { entry.remove_entry(); } - Some(bank) + Some(bank.clone_without_scheduler()) } pub fn highest_slot(&self) -> Slot { @@ -235,6 +250,10 @@ impl BankForks { self[self.highest_slot()].clone() } + pub fn working_bank_with_scheduler(&self) -> &BankWithScheduler { + &self.banks[&self.highest_slot()] + } + fn do_set_root_return_metrics( &mut self, root: Slot, @@ -247,9 +266,8 @@ impl BankForks { // ensure atomic ordering correctness. self.root.store(root, Ordering::Release); - let root_bank = self - .banks - .get(&root) + let root_bank = &self + .get(root) .expect("root bank didn't exist in bank_forks"); let new_epoch = root_bank.epoch(); if old_epoch != new_epoch { diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs new file mode 100644 index 000000000..9fd3a5546 --- /dev/null +++ b/runtime/src/installed_scheduler_pool.rs @@ -0,0 +1,84 @@ +//! Currently, there's only one auxiliary type called BankWithScheduler.. This file will be +//! populated by later PRs to align with the filename. + +#[cfg(feature = "dev-context-only-utils")] +use qualifier_attr::qualifiers; +use { + crate::bank::Bank, + std::{ + fmt::Debug, + ops::Deref, + sync::{Arc, RwLock}, + }, +}; + +// currently dummy type; will be replaced with the introduction of real type by upcoming pr... +pub type DefaultInstalledSchedulerBox = (); + +/// Very thin wrapper around Arc +/// +/// It brings type-safety against accidental mixing of bank and scheduler with different slots, +/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via +/// ::drop() inside BankForks::set_root()'s pruning, perfectly matching to Arc's lifetime by +/// piggybacking on the pruning. +/// +/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put +/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied +/// Arc). BankWithScheduler behaves almost like Arc. It only adds a few of transaction +/// scheduling and scheduler management functions. For this reason, `bank` variable names should be +/// used for `BankWithScheduler` across codebase. +/// +/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to +/// avoid ambiguity as to which to clone: BankWithScheduler or Arc. Use +/// clone_without_scheduler() for Arc. Otherwise, use clone_with_scheduler() (this should be +/// unusual outside scheduler code-path) +#[derive(Debug)] +pub struct BankWithScheduler { + inner: Arc, +} + +#[derive(Debug)] +pub struct BankWithSchedulerInner { + bank: Arc, + #[allow(dead_code)] + scheduler: InstalledSchedulerRwLock, +} +pub type InstalledSchedulerRwLock = RwLock>; + +impl BankWithScheduler { + #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] + pub(crate) fn new(bank: Arc, scheduler: Option) -> Self { + Self { + inner: Arc::new(BankWithSchedulerInner { + bank, + scheduler: RwLock::new(scheduler), + }), + } + } + + pub fn new_without_scheduler(bank: Arc) -> Self { + Self::new(bank, None) + } + + pub fn clone_with_scheduler(&self) -> BankWithScheduler { + BankWithScheduler { + inner: self.inner.clone(), + } + } + + pub fn clone_without_scheduler(&self) -> Arc { + self.inner.bank.clone() + } + + pub const fn no_scheduler_available() -> InstalledSchedulerRwLock { + RwLock::new(None) + } +} + +impl Deref for BankWithScheduler { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.inner.bank + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 503d24410..1bbd47984 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -16,6 +16,7 @@ pub mod epoch_stakes; pub mod genesis_utils; pub mod inline_feature_gate_program; pub mod inline_spl_associated_token_account; +pub mod installed_scheduler_pool; pub mod loader_utils; pub mod non_circulating_supply; pub mod prioritization_fee;