Add BankWithScheduler for upcoming scheduler code (#33704)

* Add BankWithScheduler for upcoming scheduler code

* Remove too confusing insert_without_scheduler()

* Add doc comment as a bonus

* Simplify BankForks::banks()

* Add derive(Debug) on BankWithScheduler
This commit is contained in:
Ryo Onodera 2023-10-21 15:56:43 +09:00 committed by GitHub
parent 01f1bf2799
commit 5a963529a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 270 additions and 105 deletions

View File

@ -37,7 +37,8 @@ use {
}, },
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_runtime::{ solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, bank::Bank, bank_forks::BankForks, installed_scheduler_pool::BankWithScheduler,
prioritization_fee_cache::PrioritizationFeeCache,
}, },
solana_sdk::{ solana_sdk::{
genesis_config::GenesisConfig, genesis_config::GenesisConfig,
@ -398,6 +399,7 @@ fn simulate_process_entries(
num_accounts: usize, num_accounts: usize,
) { ) {
let bank = Arc::new(Bank::new_for_benches(genesis_config)); let bank = Arc::new(Bank::new_for_benches(genesis_config));
let bank = BankWithScheduler::new_without_scheduler(bank);
for i in 0..(num_accounts / 2) { for i in 0..(num_accounts / 2) {
bank.transfer(initial_lamports, mint_keypair, &keypairs[i * 2].pubkey()) bank.transfer(initial_lamports, mint_keypair, &keypairs[i * 2].pubkey())

View File

@ -61,6 +61,7 @@ use {
bank::{bank_hash_details, Bank, NewBankOptions}, bank::{bank_hash_details, Bank, NewBankOptions},
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
installed_scheduler_pool::BankWithScheduler,
prioritization_fee_cache::PrioritizationFeeCache, prioritization_fee_cache::PrioritizationFeeCache,
}, },
solana_sdk::{ solana_sdk::{
@ -1988,7 +1989,7 @@ impl ReplayStage {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn replay_blockstore_into_bank( fn replay_blockstore_into_bank(
bank: &Arc<Bank>, bank: &BankWithScheduler,
blockstore: &Blockstore, blockstore: &Blockstore,
replay_stats: &RwLock<ReplaySlotStats>, replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>, replay_progress: &RwLock<ConfirmationProgress>,
@ -2599,7 +2600,11 @@ impl ReplayStage {
return replay_result; return replay_result;
} }
let bank = bank_forks.read().unwrap().get(bank_slot).unwrap(); let bank = bank_forks
.read()
.unwrap()
.get_with_scheduler(bank_slot)
.unwrap();
let parent_slot = bank.parent_slot(); let parent_slot = bank.parent_slot();
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
let stats = progress_lock let stats = progress_lock
@ -2687,7 +2692,11 @@ impl ReplayStage {
debug!("bank_slot {:?} is marked dead", bank_slot); debug!("bank_slot {:?} is marked dead", bank_slot);
replay_result.is_slot_dead = true; replay_result.is_slot_dead = true;
} else { } else {
let bank = bank_forks.read().unwrap().get(bank_slot).unwrap(); let bank = bank_forks
.read()
.unwrap()
.get_with_scheduler(bank_slot)
.unwrap();
let parent_slot = bank.parent_slot(); let parent_slot = bank.parent_slot();
let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank); let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank);
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
@ -2768,7 +2777,11 @@ impl ReplayStage {
} }
let bank_slot = replay_result.bank_slot; let bank_slot = replay_result.bank_slot;
let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); let bank = &bank_forks
.read()
.unwrap()
.get_with_scheduler(bank_slot)
.unwrap();
if let Some(replay_result) = &replay_result.replay_result { if let Some(replay_result) = &replay_result.replay_result {
match replay_result { match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count, Ok(replay_tx_count) => tx_count += replay_tx_count,
@ -2826,7 +2839,9 @@ impl ReplayStage {
); );
// report cost tracker stats // report cost tracker stats
cost_update_sender cost_update_sender
.send(CostUpdate::FrozenBank { bank: bank.clone() }) .send(CostUpdate::FrozenBank {
bank: bank.clone_without_scheduler(),
})
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
warn!("cost_update_sender failed sending bank stats: {:?}", err) warn!("cost_update_sender failed sending bank stats: {:?}", err)
}); });
@ -2887,7 +2902,7 @@ impl ReplayStage {
if let Some(sender) = bank_notification_sender { if let Some(sender) = bank_notification_sender {
sender sender
.sender .sender
.send(BankNotification::Frozen(bank.clone())) .send(BankNotification::Frozen(bank.clone_without_scheduler()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
} }
blockstore_processor::cache_block_meta(bank, cache_block_meta_sender); blockstore_processor::cache_block_meta(bank, cache_block_meta_sender);
@ -4747,7 +4762,7 @@ pub(crate) mod tests {
assert_eq!(bank0.tick_height(), bank0.max_tick_height()); assert_eq!(bank0.tick_height(), bank0.max_tick_height());
let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1); let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1); bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap(); let bank1 = bank_forks.read().unwrap().get_with_scheduler(1).unwrap();
let bank1_progress = progress let bank1_progress = progress
.entry(bank1.slot()) .entry(bank1.slot())
.or_insert_with(|| ForkProgress::new(bank1.last_blockhash(), None, None, 0, 0)); .or_insert_with(|| ForkProgress::new(bank1.last_blockhash(), None, None, 0, 0));

View File

@ -599,18 +599,22 @@ fn test_epoch_accounts_hash_and_warping() {
); );
// flush the write cache so warping can calculate the accounts hash from storages // flush the write cache so warping can calculate the accounts hash from storages
bank.force_flush_accounts_cache(); bank.force_flush_accounts_cache();
let bank = bank_forks.write().unwrap().insert(Bank::warp_from_parent( let bank = bank_forks
bank, .write()
&Pubkey::default(), .unwrap()
eah_stop_slot_in_next_epoch, .insert(Bank::warp_from_parent(
CalcAccountsHashDataSource::Storages, bank,
)); &Pubkey::default(),
eah_stop_slot_in_next_epoch,
CalcAccountsHashDataSource::Storages,
))
.clone_without_scheduler();
let slot = bank.slot().checked_add(1).unwrap(); let slot = bank.slot().checked_add(1).unwrap();
let bank = let bank = bank_forks
bank_forks .write()
.write() .unwrap()
.unwrap() .insert(Bank::new_from_parent(bank, &Pubkey::default(), slot))
.insert(Bank::new_from_parent(bank, &Pubkey::default(), slot)); .clone_without_scheduler();
bank_forks.write().unwrap().set_root( bank_forks.write().unwrap().set_root(
bank.slot(), bank.slot(),
&test_environment &test_environment
@ -634,18 +638,22 @@ fn test_epoch_accounts_hash_and_warping() {
epoch_schedule.get_first_slot_in_epoch(bank.epoch() + 1) + eah_start_offset; epoch_schedule.get_first_slot_in_epoch(bank.epoch() + 1) + eah_start_offset;
// flush the write cache so warping can calculate the accounts hash from storages // flush the write cache so warping can calculate the accounts hash from storages
bank.force_flush_accounts_cache(); bank.force_flush_accounts_cache();
let bank = bank_forks.write().unwrap().insert(Bank::warp_from_parent( let bank = bank_forks
bank, .write()
&Pubkey::default(), .unwrap()
eah_start_slot_in_next_epoch, .insert(Bank::warp_from_parent(
CalcAccountsHashDataSource::Storages, bank,
)); &Pubkey::default(),
eah_start_slot_in_next_epoch,
CalcAccountsHashDataSource::Storages,
))
.clone_without_scheduler();
let slot = bank.slot().checked_add(1).unwrap(); let slot = bank.slot().checked_add(1).unwrap();
let bank = let bank = bank_forks
bank_forks .write()
.write() .unwrap()
.unwrap() .insert(Bank::new_from_parent(bank, &Pubkey::default(), slot))
.insert(Bank::new_from_parent(bank, &Pubkey::default(), slot)); .clone_without_scheduler();
bank_forks.write().unwrap().set_root( bank_forks.write().unwrap().set_root(
bank.slot(), bank.slot(),
&test_environment &test_environment

View File

@ -39,6 +39,7 @@ use {
bank_forks::BankForks, bank_forks::BankForks,
bank_utils, bank_utils,
commitment::VOTE_THRESHOLD_SIZE, commitment::VOTE_THRESHOLD_SIZE,
installed_scheduler_pool::BankWithScheduler,
prioritization_fee_cache::PrioritizationFeeCache, prioritization_fee_cache::PrioritizationFeeCache,
runtime_config::RuntimeConfig, runtime_config::RuntimeConfig,
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
@ -418,13 +419,13 @@ fn execute_batches(
/// This method is for use testing against a single Bank, and assumes `Bank::transaction_count()` /// This method is for use testing against a single Bank, and assumes `Bank::transaction_count()`
/// represents the number of transactions executed in this Bank /// represents the number of transactions executed in this Bank
pub fn process_entries_for_tests( pub fn process_entries_for_tests(
bank: &Arc<Bank>, bank: &BankWithScheduler,
entries: Vec<Entry>, entries: Vec<Entry>,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>, replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> { ) -> Result<()> {
let verify_transaction = { let verify_transaction = {
let bank = bank.clone(); let bank = bank.clone_with_scheduler();
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> { move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
bank.verify_transaction(versioned_tx, TransactionVerificationMode::FullVerification) bank.verify_transaction(versioned_tx, TransactionVerificationMode::FullVerification)
} }
@ -463,7 +464,7 @@ pub fn process_entries_for_tests(
} }
fn process_entries( fn process_entries(
bank: &Arc<Bank>, bank: &BankWithScheduler,
entries: &mut [ReplayEntry], entries: &mut [ReplayEntry],
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>, replay_vote_sender: Option<&ReplayVoteSender>,
@ -715,11 +716,16 @@ pub(crate) fn process_blockstore_for_bank_0(
accounts_update_notifier, accounts_update_notifier,
exit, exit,
); );
let bank0_slot = bank0.slot();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
info!("Processing ledger for slot 0..."); info!("Processing ledger for slot 0...");
process_bank_0( process_bank_0(
&bank_forks.read().unwrap().root_bank(), &bank_forks
.read()
.unwrap()
.get_with_scheduler(bank0_slot)
.unwrap(),
blockstore, blockstore,
opts, opts,
&VerifyRecyclers::default(), &VerifyRecyclers::default(),
@ -889,7 +895,7 @@ fn verify_ticks(
fn confirm_full_slot( fn confirm_full_slot(
blockstore: &Blockstore, blockstore: &Blockstore,
bank: &Arc<Bank>, bank: &BankWithScheduler,
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress, progress: &mut ConfirmationProgress,
@ -1050,7 +1056,7 @@ impl ConfirmationProgress {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn confirm_slot( pub fn confirm_slot(
blockstore: &Blockstore, blockstore: &Blockstore,
bank: &Arc<Bank>, bank: &BankWithScheduler,
timing: &mut ConfirmationTiming, timing: &mut ConfirmationTiming,
progress: &mut ConfirmationProgress, progress: &mut ConfirmationProgress,
skip_verification: bool, skip_verification: bool,
@ -1095,7 +1101,7 @@ pub fn confirm_slot(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn confirm_slot_entries( fn confirm_slot_entries(
bank: &Arc<Bank>, bank: &BankWithScheduler,
slot_entries_load_result: (Vec<Entry>, u64, bool), slot_entries_load_result: (Vec<Entry>, u64, bool),
timing: &mut ConfirmationTiming, timing: &mut ConfirmationTiming,
progress: &mut ConfirmationProgress, progress: &mut ConfirmationProgress,
@ -1192,7 +1198,7 @@ fn confirm_slot_entries(
}; };
let verify_transaction = { let verify_transaction = {
let bank = bank.clone(); let bank = bank.clone_with_scheduler();
move |versioned_tx: VersionedTransaction, move |versioned_tx: VersionedTransaction,
verification_mode: TransactionVerificationMode| verification_mode: TransactionVerificationMode|
-> Result<SanitizedTransaction> { -> Result<SanitizedTransaction> {
@ -1291,7 +1297,7 @@ fn confirm_slot_entries(
// Special handling required for processing the entries in slot 0 // Special handling required for processing the entries in slot 0
fn process_bank_0( fn process_bank_0(
bank0: &Arc<Bank>, bank0: &BankWithScheduler,
blockstore: &Blockstore, blockstore: &Blockstore,
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
@ -1490,7 +1496,7 @@ fn load_frozen_forks(
// Block must be frozen by this point; otherwise, // Block must be frozen by this point; otherwise,
// process_single_slot() would have errored above. // process_single_slot() would have errored above.
assert!(bank.is_frozen()); assert!(bank.is_frozen());
all_banks.insert(bank.slot(), bank.clone()); all_banks.insert(bank.slot(), bank.clone_with_scheduler());
m.stop(); m.stop();
process_single_slot_us += m.as_us(); process_single_slot_us += m.as_us();
@ -1520,7 +1526,7 @@ fn load_frozen_forks(
// Ensure cluster-confirmed root and parents are set as root in blockstore // Ensure cluster-confirmed root and parents are set as root in blockstore
let mut rooted_slots = vec![]; let mut rooted_slots = vec![];
let mut new_root_bank = cluster_root_bank.clone(); let mut new_root_bank = cluster_root_bank.clone_without_scheduler();
loop { loop {
if new_root_bank.slot() == root { break; } // Found the last root in the chain, yay! if new_root_bank.slot() == root { break; } // Found the last root in the chain, yay!
assert!(new_root_bank.slot() > root); assert!(new_root_bank.slot() > root);
@ -1675,7 +1681,7 @@ fn supermajority_root_from_vote_accounts(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn process_single_slot( fn process_single_slot(
blockstore: &Blockstore, blockstore: &Blockstore,
bank: &Arc<Bank>, bank: &BankWithScheduler,
opts: &ProcessOptions, opts: &ProcessOptions,
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress, progress: &mut ConfirmationProgress,
@ -1907,6 +1913,18 @@ pub mod tests {
} }
} }
fn process_entries_for_tests_without_scheduler(
bank: &Arc<Bank>,
entries: Vec<Entry>,
) -> Result<()> {
process_entries_for_tests(
&BankWithScheduler::new_without_scheduler(bank.clone()),
entries,
None,
None,
)
}
#[test] #[test]
fn test_process_blockstore_with_missing_hashes() { fn test_process_blockstore_with_missing_hashes() {
do_test_process_blockstore_with_missing_hashes(AccessType::Primary); do_test_process_blockstore_with_missing_hashes(AccessType::Primary);
@ -2602,7 +2620,7 @@ pub mod tests {
); );
// Now ensure the TX is accepted despite pointing to the ID of an empty entry. // Now ensure the TX is accepted despite pointing to the ID of an empty entry.
process_entries_for_tests(&bank, slot_entries, None, None).unwrap(); process_entries_for_tests_without_scheduler(&bank, slot_entries).unwrap();
assert_eq!(bank.process_transaction(&tx), Ok(())); assert_eq!(bank.process_transaction(&tx), Ok(()));
} }
@ -2737,7 +2755,7 @@ pub mod tests {
assert_eq!(bank.tick_height(), 0); assert_eq!(bank.tick_height(), 0);
let tick = next_entry(&genesis_config.hash(), 1, vec![]); let tick = next_entry(&genesis_config.hash(), 1, vec![]);
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, vec![tick], None, None), process_entries_for_tests_without_scheduler(&bank, vec![tick]),
Ok(()) Ok(())
); );
assert_eq!(bank.tick_height(), 1); assert_eq!(bank.tick_height(), 1);
@ -2772,7 +2790,7 @@ pub mod tests {
); );
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, vec![entry_1, entry_2], None, None), process_entries_for_tests_without_scheduler(&bank, vec![entry_1, entry_2]),
Ok(()) Ok(())
); );
assert_eq!(bank.get_balance(&keypair1.pubkey()), 2); assert_eq!(bank.get_balance(&keypair1.pubkey()), 2);
@ -2828,11 +2846,9 @@ pub mod tests {
); );
assert_eq!( assert_eq!(
process_entries_for_tests( process_entries_for_tests_without_scheduler(
&bank, &bank,
vec![entry_1_to_mint, entry_2_to_3_mint_to_1], vec![entry_1_to_mint, entry_2_to_3_mint_to_1],
None,
None,
), ),
Ok(()) Ok(())
); );
@ -2899,11 +2915,9 @@ pub mod tests {
], ],
); );
assert!(process_entries_for_tests( assert!(process_entries_for_tests_without_scheduler(
&bank, &bank,
vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()],
None,
None,
) )
.is_err()); .is_err());
@ -3017,7 +3031,7 @@ pub mod tests {
let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]); let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]);
let bank = Arc::new(bank); let bank = Arc::new(bank);
let result = process_entries_for_tests(&bank, vec![entry], None, None); let result = process_entries_for_tests_without_scheduler(&bank, vec![entry]);
bank.freeze(); bank.freeze();
let blockhash_ok = bank.last_blockhash(); let blockhash_ok = bank.last_blockhash();
let bankhash_ok = bank.hash(); let bankhash_ok = bank.hash();
@ -3058,7 +3072,7 @@ pub mod tests {
let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]); let entry = next_entry(&bank.last_blockhash(), 1, vec![tx]);
let bank = Arc::new(bank); let bank = Arc::new(bank);
let _result = process_entries_for_tests(&bank, vec![entry], None, None); let _result = process_entries_for_tests_without_scheduler(&bank, vec![entry]);
bank.freeze(); bank.freeze();
assert_eq!(blockhash_ok, bank.last_blockhash()); assert_eq!(blockhash_ok, bank.last_blockhash());
@ -3150,15 +3164,13 @@ pub mod tests {
// keypair2=3 // keypair2=3
// keypair3=3 // keypair3=3
assert!(process_entries_for_tests( assert!(process_entries_for_tests_without_scheduler(
&bank, &bank,
vec![ vec![
entry_1_to_mint, entry_1_to_mint,
entry_2_to_3_and_1_to_mint, entry_2_to_3_and_1_to_mint,
entry_conflict_itself, entry_conflict_itself,
], ],
None,
None,
) )
.is_err()); .is_err());
@ -3206,7 +3218,7 @@ pub mod tests {
system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash()); system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash());
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, vec![entry_1, entry_2], None, None), process_entries_for_tests_without_scheduler(&bank, vec![entry_1, entry_2]),
Ok(()) Ok(())
); );
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
@ -3267,7 +3279,7 @@ pub mod tests {
}) })
.collect(); .collect();
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, entries, None, None), process_entries_for_tests_without_scheduler(&bank, entries),
Ok(()) Ok(())
); );
} }
@ -3330,7 +3342,7 @@ pub mod tests {
// Transfer lamports to each other // Transfer lamports to each other
let entry = next_entry(&bank.last_blockhash(), 1, tx_vector); let entry = next_entry(&bank.last_blockhash(), 1, tx_vector);
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, vec![entry], None, None), process_entries_for_tests_without_scheduler(&bank, vec![entry]),
Ok(()) Ok(())
); );
bank.squash(); bank.squash();
@ -3390,7 +3402,10 @@ pub mod tests {
system_transaction::transfer(&keypair1, &keypair4.pubkey(), 1, bank.last_blockhash()); system_transaction::transfer(&keypair1, &keypair4.pubkey(), 1, bank.last_blockhash());
let entry_2 = next_entry(&tick.hash, 1, vec![tx]); let entry_2 = next_entry(&tick.hash, 1, vec![tx]);
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, vec![entry_1, tick, entry_2.clone()], None, None,), process_entries_for_tests_without_scheduler(
&bank,
vec![entry_1, tick, entry_2.clone()],
),
Ok(()) Ok(())
); );
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
@ -3401,7 +3416,7 @@ pub mod tests {
system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash()); system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash());
let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]); let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]);
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, vec![entry_3], None, None), process_entries_for_tests_without_scheduler(&bank, vec![entry_3]),
Err(TransactionError::AccountNotFound) Err(TransactionError::AccountNotFound)
); );
} }
@ -3481,7 +3496,7 @@ pub mod tests {
); );
assert_eq!( assert_eq!(
process_entries_for_tests(&bank, vec![entry_1_to_mint], None, None), process_entries_for_tests_without_scheduler(&bank, vec![entry_1_to_mint]),
Err(TransactionError::AccountInUse) Err(TransactionError::AccountInUse)
); );
@ -3560,7 +3575,7 @@ pub mod tests {
// Set up bank1 // Set up bank1
let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config)); let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config));
let bank0 = bank_forks.get(0).unwrap(); let bank0 = bank_forks.get_with_scheduler(0).unwrap();
let opts = ProcessOptions { let opts = ProcessOptions {
run_verification: true, run_verification: true,
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
@ -3569,7 +3584,11 @@ pub mod tests {
let recyclers = VerifyRecyclers::default(); let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None); process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None);
let bank0_last_blockhash = bank0.last_blockhash(); let bank0_last_blockhash = bank0.last_blockhash();
let bank1 = bank_forks.insert(Bank::new_from_parent(bank0, &Pubkey::default(), 1)); let bank1 = bank_forks.insert(Bank::new_from_parent(
bank0.clone_without_scheduler(),
&Pubkey::default(),
1,
));
confirm_full_slot( confirm_full_slot(
&blockstore, &blockstore,
&bank1, &bank1,
@ -3684,7 +3703,7 @@ pub mod tests {
}) })
.collect(); .collect();
info!("paying iteration {}", i); info!("paying iteration {}", i);
process_entries_for_tests(&bank, entries, None, None).expect("paying failed"); process_entries_for_tests_without_scheduler(&bank, entries).expect("paying failed");
let entries: Vec<_> = (0..NUM_TRANSFERS) let entries: Vec<_> = (0..NUM_TRANSFERS)
.step_by(NUM_TRANSFERS_PER_ENTRY) .step_by(NUM_TRANSFERS_PER_ENTRY)
@ -3707,16 +3726,14 @@ pub mod tests {
.collect(); .collect();
info!("refunding iteration {}", i); info!("refunding iteration {}", i);
process_entries_for_tests(&bank, entries, None, None).expect("refunding failed"); process_entries_for_tests_without_scheduler(&bank, entries).expect("refunding failed");
// advance to next block // advance to next block
process_entries_for_tests( process_entries_for_tests_without_scheduler(
&bank, &bank,
(0..bank.ticks_per_slot()) (0..bank.ticks_per_slot())
.map(|_| next_entry_mut(&mut hash, 1, vec![])) .map(|_| next_entry_mut(&mut hash, 1, vec![]))
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
None,
None,
) )
.expect("process ticks failed"); .expect("process ticks failed");
@ -3756,7 +3773,7 @@ pub mod tests {
let entry = next_entry(&new_blockhash, 1, vec![tx]); let entry = next_entry(&new_blockhash, 1, vec![tx]);
entries.push(entry); entries.push(entry);
process_entries_for_tests(&bank0, entries, None, None).unwrap(); process_entries_for_tests_without_scheduler(&bank0, entries).unwrap();
assert_eq!(bank0.get_balance(&keypair.pubkey()), 1) assert_eq!(bank0.get_balance(&keypair.pubkey()), 1)
} }
@ -3922,7 +3939,12 @@ pub mod tests {
.collect(); .collect();
let entry = next_entry(&bank_1_blockhash, 1, vote_txs); let entry = next_entry(&bank_1_blockhash, 1, vote_txs);
let (replay_vote_sender, replay_vote_receiver) = crossbeam_channel::unbounded(); let (replay_vote_sender, replay_vote_receiver) = crossbeam_channel::unbounded();
let _ = process_entries_for_tests(&bank1, vec![entry], None, Some(&replay_vote_sender)); let _ = process_entries_for_tests(
&BankWithScheduler::new_without_scheduler(bank1),
vec![entry],
None,
Some(&replay_vote_sender),
);
let successes: BTreeSet<Pubkey> = replay_vote_receiver let successes: BTreeSet<Pubkey> = replay_vote_receiver
.try_iter() .try_iter()
.map(|(vote_pubkey, ..)| vote_pubkey) .map(|(vote_pubkey, ..)| vote_pubkey)
@ -4210,7 +4232,7 @@ pub mod tests {
prev_entry_hash: Hash, prev_entry_hash: Hash,
) -> result::Result<(), BlockstoreProcessorError> { ) -> result::Result<(), BlockstoreProcessorError> {
confirm_slot_entries( confirm_slot_entries(
bank, &BankWithScheduler::new_without_scheduler(bank.clone()),
(slot_entries, 0, slot_full), (slot_entries, 0, slot_full),
&mut ConfirmationTiming::default(), &mut ConfirmationTiming::default(),
&mut ConfirmationProgress::new(prev_entry_hash), &mut ConfirmationProgress::new(prev_entry_hash),
@ -4232,7 +4254,9 @@ pub mod tests {
.. ..
} = create_genesis_config(100 * LAMPORTS_PER_SOL); } = create_genesis_config(100 * LAMPORTS_PER_SOL);
let genesis_hash = genesis_config.hash(); let genesis_hash = genesis_config.hash();
let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let bank = BankWithScheduler::new_without_scheduler(Arc::new(Bank::new_for_tests(
&genesis_config,
)));
let mut timing = ConfirmationTiming::default(); let mut timing = ConfirmationTiming::default();
let mut progress = ConfirmationProgress::new(genesis_hash); let mut progress = ConfirmationProgress::new(genesis_hash);
let amount = genesis_config.rent.minimum_balance(0); let amount = genesis_config.rent.minimum_balance(0);

View File

@ -25,7 +25,7 @@ use {
}, },
solana_measure::{measure, measure_us}, solana_measure::{measure, measure_us},
solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo},
solana_runtime::bank::Bank, solana_runtime::{bank::Bank, installed_scheduler_pool::BankWithScheduler},
solana_sdk::{ solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
hash::Hash, hash::Hash,
@ -264,9 +264,8 @@ impl PohRecorderBank {
} }
} }
#[derive(Clone)]
pub struct WorkingBank { pub struct WorkingBank {
pub bank: Arc<Bank>, pub bank: BankWithScheduler,
pub start: Arc<Instant>, pub start: Arc<Instant>,
pub min_tick_height: u64, pub min_tick_height: u64,
pub max_tick_height: u64, pub max_tick_height: u64,
@ -596,7 +595,7 @@ impl PohRecorder {
self.leader_last_tick_height = leader_last_tick_height; self.leader_last_tick_height = leader_last_tick_height;
} }
pub fn set_bank(&mut self, bank: Arc<Bank>, track_transaction_indexes: bool) { pub fn set_bank(&mut self, bank: BankWithScheduler, track_transaction_indexes: bool) {
assert!(self.working_bank.is_none()); assert!(self.working_bank.is_none());
self.leader_bank_notifier.set_in_progress(&bank); self.leader_bank_notifier.set_in_progress(&bank);
let working_bank = WorkingBank { let working_bank = WorkingBank {
@ -644,12 +643,12 @@ impl PohRecorder {
#[cfg(feature = "dev-context-only-utils")] #[cfg(feature = "dev-context-only-utils")]
pub fn set_bank_for_test(&mut self, bank: Arc<Bank>) { pub fn set_bank_for_test(&mut self, bank: Arc<Bank>) {
self.set_bank(bank, false) self.set_bank(BankWithScheduler::new_without_scheduler(bank), false)
} }
#[cfg(test)] #[cfg(test)]
pub fn set_bank_with_transaction_index_for_test(&mut self, bank: Arc<Bank>) { pub fn set_bank_with_transaction_index_for_test(&mut self, bank: Arc<Bank>) {
self.set_bank(bank, true) self.set_bank(BankWithScheduler::new_without_scheduler(bank), true)
} }
// Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height // Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height
@ -1092,7 +1091,7 @@ pub fn create_test_recorder(
); );
let ticks_per_slot = bank.ticks_per_slot(); let ticks_per_slot = bank.ticks_per_slot();
poh_recorder.set_bank(bank, false); poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false);
let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_service = PohService::new( let poh_service = PohService::new(
poh_recorder.clone(), poh_recorder.clone(),

View File

@ -1129,13 +1129,15 @@ impl ProgramTestContext {
bank.freeze(); bank.freeze();
bank bank
} else { } else {
bank_forks.insert(Bank::warp_from_parent( bank_forks
bank, .insert(Bank::warp_from_parent(
&Pubkey::default(), bank,
pre_warp_slot, &Pubkey::default(),
// some warping tests cannot use the append vecs because of the sequence of adding roots and flushing pre_warp_slot,
solana_accounts_db::accounts_db::CalcAccountsHashDataSource::IndexForTests, // some warping tests cannot use the append vecs because of the sequence of adding roots and flushing
)) solana_accounts_db::accounts_db::CalcAccountsHashDataSource::IndexForTests,
))
.clone_without_scheduler()
}; };
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();

View File

@ -50,6 +50,7 @@ use {
bank::{Bank, TransactionSimulationResult}, bank::{Bank, TransactionSimulationResult},
bank_forks::BankForks, bank_forks::BankForks,
commitment::{BlockCommitmentArray, BlockCommitmentCache, CommitmentSlots}, commitment::{BlockCommitmentArray, BlockCommitmentCache, CommitmentSlots},
installed_scheduler_pool::BankWithScheduler,
non_circulating_supply::calculate_non_circulating_supply, non_circulating_supply::calculate_non_circulating_supply,
prioritization_fee_cache::PrioritizationFeeCache, prioritization_fee_cache::PrioritizationFeeCache,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
@ -4602,7 +4603,7 @@ pub fn populate_blockstore_for_tests(
// that they are matched properly by get_rooted_block // that they are matched properly by get_rooted_block
assert_eq!( assert_eq!(
solana_ledger::blockstore_processor::process_entries_for_tests( solana_ledger::blockstore_processor::process_entries_for_tests(
&bank, &BankWithScheduler::new_without_scheduler(bank),
entries, entries,
Some( Some(
&solana_ledger::blockstore_processor::TransactionStatusSender { &solana_ledger::blockstore_processor::TransactionStatusSender {
@ -4962,7 +4963,12 @@ pub mod tests {
for (i, root) in roots.iter().enumerate() { for (i, root) in roots.iter().enumerate() {
let new_bank = let new_bank =
Bank::new_from_parent(parent_bank.clone(), parent_bank.collector_id(), *root); Bank::new_from_parent(parent_bank.clone(), parent_bank.collector_id(), *root);
parent_bank = self.bank_forks.write().unwrap().insert(new_bank); parent_bank = self
.bank_forks
.write()
.unwrap()
.insert(new_bank)
.clone_without_scheduler();
let parent = if i > 0 { roots[i - 1] } else { 0 }; let parent = if i > 0 { roots[i - 1] } else { 0 };
fill_blockstore_slot_with_ticks( fill_blockstore_slot_with_ticks(
&self.blockstore, &self.blockstore,
@ -5004,7 +5010,8 @@ pub mod tests {
.bank_forks .bank_forks
.write() .write()
.unwrap() .unwrap()
.insert(Bank::new_from_parent(parent_bank, &Pubkey::default(), slot)); .insert(Bank::new_from_parent(parent_bank, &Pubkey::default(), slot))
.clone_without_scheduler();
let new_block_commitment = BlockCommitmentCache::new( let new_block_commitment = BlockCommitmentCache::new(
HashMap::new(), HashMap::new(),

View File

@ -8513,11 +8513,16 @@ impl Drop for Bank {
pub mod test_utils { pub mod test_utils {
use { use {
super::Bank, super::Bank,
crate::installed_scheduler_pool::BankWithScheduler,
solana_sdk::{hash::hashv, pubkey::Pubkey}, solana_sdk::{hash::hashv, pubkey::Pubkey},
solana_vote_program::vote_state::{self, BlockTimestamp, VoteStateVersions}, solana_vote_program::vote_state::{self, BlockTimestamp, VoteStateVersions},
std::sync::Arc, std::sync::Arc,
}; };
pub fn goto_end_of_slot(bank: Arc<Bank>) { pub fn goto_end_of_slot(bank: Arc<Bank>) {
goto_end_of_slot_with_scheduler(&BankWithScheduler::new_without_scheduler(bank))
}
pub fn goto_end_of_slot_with_scheduler(bank: &BankWithScheduler) {
let mut tick_hash = bank.last_blockhash(); let mut tick_hash = bank.last_blockhash();
loop { loop {
tick_hash = hashv(&[tick_hash.as_ref(), &[42]]); tick_hash = hashv(&[tick_hash.as_ref(), &[42]]);

View File

@ -4,6 +4,7 @@ use {
crate::{ crate::{
accounts_background_service::{AbsRequestSender, SnapshotRequest, SnapshotRequestKind}, accounts_background_service::{AbsRequestSender, SnapshotRequest, SnapshotRequestKind},
bank::{epoch_accounts_hash_utils, Bank, SquashTiming}, bank::{epoch_accounts_hash_utils, Bank, SquashTiming},
installed_scheduler_pool::BankWithScheduler,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
}, },
log::*, log::*,
@ -57,7 +58,7 @@ struct SetRootTimings {
#[derive(Debug)] #[derive(Debug)]
pub struct BankForks { pub struct BankForks {
banks: HashMap<Slot, Arc<Bank>>, banks: HashMap<Slot, BankWithScheduler>,
descendants: HashMap<Slot, HashSet<Slot>>, descendants: HashMap<Slot, HashSet<Slot>>,
root: Arc<AtomicSlot>, root: Arc<AtomicSlot>,
@ -82,8 +83,8 @@ impl BankForks {
Self::new_from_banks(&[Arc::new(bank)], root) Self::new_from_banks(&[Arc::new(bank)], root)
} }
pub fn banks(&self) -> HashMap<Slot, Arc<Bank>> { pub fn banks(&self) -> &HashMap<Slot, BankWithScheduler> {
self.banks.clone() &self.banks
} }
pub fn get_vote_only_mode_signal(&self) -> Arc<AtomicBool> { pub fn get_vote_only_mode_signal(&self) -> Arc<AtomicBool> {
@ -119,7 +120,7 @@ impl BankForks {
self.banks self.banks
.iter() .iter()
.filter(|(_, b)| b.is_frozen()) .filter(|(_, b)| b.is_frozen())
.map(|(k, b)| (*k, b.clone())) .map(|(&k, b)| (k, b.clone_without_scheduler()))
.collect() .collect()
} }
@ -131,8 +132,13 @@ impl BankForks {
.collect() .collect()
} }
pub fn get_with_scheduler(&self, bank_slot: Slot) -> Option<BankWithScheduler> {
self.banks.get(&bank_slot).map(|b| b.clone_with_scheduler())
}
pub fn get(&self, bank_slot: Slot) -> Option<Arc<Bank>> { pub fn get(&self, bank_slot: Slot) -> Option<Arc<Bank>> {
self.banks.get(&bank_slot).cloned() self.get_with_scheduler(bank_slot)
.map(|b| b.clone_without_scheduler())
} }
pub fn get_with_checked_hash( pub fn get_with_checked_hash(
@ -159,10 +165,19 @@ impl BankForks {
// Iterate through the heads of all the different forks // Iterate through the heads of all the different forks
for bank in initial_forks { for bank in initial_forks {
banks.insert(bank.slot(), bank.clone()); banks.insert(
bank.slot(),
BankWithScheduler::new_without_scheduler(bank.clone()),
);
let parents = bank.parents(); let parents = bank.parents();
for parent in parents { for parent in parents {
if banks.insert(parent.slot(), parent.clone()).is_some() { if banks
.insert(
parent.slot(),
BankWithScheduler::new_without_scheduler(parent.clone()),
)
.is_some()
{
// All ancestors have already been inserted by another fork // All ancestors have already been inserted by another fork
break; break;
} }
@ -187,12 +202,12 @@ impl BankForks {
} }
} }
pub fn insert(&mut self, mut bank: Bank) -> Arc<Bank> { pub fn insert(&mut self, mut bank: Bank) -> BankWithScheduler {
bank.check_program_modification_slot = bank.check_program_modification_slot =
self.root.load(Ordering::Relaxed) < self.highest_slot_at_startup; self.root.load(Ordering::Relaxed) < self.highest_slot_at_startup;
let bank = Arc::new(bank); let bank = BankWithScheduler::new_without_scheduler(Arc::new(bank));
let prev = self.banks.insert(bank.slot(), bank.clone()); let prev = self.banks.insert(bank.slot(), bank.clone_with_scheduler());
assert!(prev.is_none()); assert!(prev.is_none());
let slot = bank.slot(); let slot = bank.slot();
self.descendants.entry(slot).or_default(); self.descendants.entry(slot).or_default();
@ -202,7 +217,7 @@ impl BankForks {
bank bank
} }
pub fn insert_from_ledger(&mut self, bank: Bank) -> Arc<Bank> { pub fn insert_from_ledger(&mut self, bank: Bank) -> BankWithScheduler {
self.highest_slot_at_startup = std::cmp::max(self.highest_slot_at_startup, bank.slot()); self.highest_slot_at_startup = std::cmp::max(self.highest_slot_at_startup, bank.slot());
self.insert(bank) self.insert(bank)
} }
@ -224,7 +239,7 @@ impl BankForks {
if entry.get().is_empty() { if entry.get().is_empty() {
entry.remove_entry(); entry.remove_entry();
} }
Some(bank) Some(bank.clone_without_scheduler())
} }
pub fn highest_slot(&self) -> Slot { pub fn highest_slot(&self) -> Slot {
@ -235,6 +250,10 @@ impl BankForks {
self[self.highest_slot()].clone() self[self.highest_slot()].clone()
} }
pub fn working_bank_with_scheduler(&self) -> &BankWithScheduler {
&self.banks[&self.highest_slot()]
}
fn do_set_root_return_metrics( fn do_set_root_return_metrics(
&mut self, &mut self,
root: Slot, root: Slot,
@ -247,9 +266,8 @@ impl BankForks {
// ensure atomic ordering correctness. // ensure atomic ordering correctness.
self.root.store(root, Ordering::Release); self.root.store(root, Ordering::Release);
let root_bank = self let root_bank = &self
.banks .get(root)
.get(&root)
.expect("root bank didn't exist in bank_forks"); .expect("root bank didn't exist in bank_forks");
let new_epoch = root_bank.epoch(); let new_epoch = root_bank.epoch();
if old_epoch != new_epoch { if old_epoch != new_epoch {

View File

@ -0,0 +1,84 @@
//! Currently, there's only one auxiliary type called BankWithScheduler.. This file will be
//! populated by later PRs to align with the filename.
#[cfg(feature = "dev-context-only-utils")]
use qualifier_attr::qualifiers;
use {
crate::bank::Bank,
std::{
fmt::Debug,
ops::Deref,
sync::{Arc, RwLock},
},
};
// currently dummy type; will be replaced with the introduction of real type by upcoming pr...
pub type DefaultInstalledSchedulerBox = ();
/// Very thin wrapper around Arc<Bank>
///
/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
/// which is a pretty dangerous condition. Also, it guarantees to call wait_for_termination() via
/// ::drop() inside BankForks::set_root()'s pruning, perfectly matching to Arc<Bank>'s lifetime by
/// piggybacking on the pruning.
///
/// Semantically, a scheduler is tightly coupled with a particular bank. But scheduler wasn't put
/// into Bank fields to avoid circular-references (a scheduler needs to refer to its accompanied
/// Arc<Bank>). BankWithScheduler behaves almost like Arc<Bank>. It only adds a few of transaction
/// scheduling and scheduler management functions. For this reason, `bank` variable names should be
/// used for `BankWithScheduler` across codebase.
///
/// BankWithScheduler even implements Deref for convenience. And Clone is omitted to implement to
/// avoid ambiguity as to which to clone: BankWithScheduler or Arc<Bank>. Use
/// clone_without_scheduler() for Arc<Bank>. Otherwise, use clone_with_scheduler() (this should be
/// unusual outside scheduler code-path)
#[derive(Debug)]
pub struct BankWithScheduler {
inner: Arc<BankWithSchedulerInner>,
}
#[derive(Debug)]
pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
#[allow(dead_code)]
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<Option<DefaultInstalledSchedulerBox>>;
impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<DefaultInstalledSchedulerBox>) -> Self {
Self {
inner: Arc::new(BankWithSchedulerInner {
bank,
scheduler: RwLock::new(scheduler),
}),
}
}
pub fn new_without_scheduler(bank: Arc<Bank>) -> Self {
Self::new(bank, None)
}
pub fn clone_with_scheduler(&self) -> BankWithScheduler {
BankWithScheduler {
inner: self.inner.clone(),
}
}
pub fn clone_without_scheduler(&self) -> Arc<Bank> {
self.inner.bank.clone()
}
pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(None)
}
}
impl Deref for BankWithScheduler {
type Target = Arc<Bank>;
fn deref(&self) -> &Self::Target {
&self.inner.bank
}
}

View File

@ -16,6 +16,7 @@ pub mod epoch_stakes;
pub mod genesis_utils; pub mod genesis_utils;
pub mod inline_feature_gate_program; pub mod inline_feature_gate_program;
pub mod inline_spl_associated_token_account; pub mod inline_spl_associated_token_account;
pub mod installed_scheduler_pool;
pub mod loader_utils; pub mod loader_utils;
pub mod non_circulating_supply; pub mod non_circulating_supply;
pub mod prioritization_fee; pub mod prioritization_fee;