From d2b5afc41001a16a1dbbbf6106c0ef85177b53bc Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 19 Dec 2023 09:50:41 +0900 Subject: [PATCH] Finish unified scheduler plumbing with min impl (#34300) * Finalize unified scheduler plumbing with min impl * Fix comment * Rename leftover type name... * Make logging text less ambiguous * Make PhantomData simplyer without already used S * Make TaskHandler stateless again * Introduce HandlerContext to simplify TaskHandler * Add comment for coexistence of Pool::{new,new_dyn} * Fix grammar * Remove confusing const for upcoming changes * Demote InstalledScheduler::context() into dcou * Delay drop of context up to return_to_pool()-ing * Revert "Demote InstalledScheduler::context() into dcou" This reverts commit 049a126c905df0ba8ad975c5cb1007ae90a21050. * Revert "Delay drop of context up to return_to_pool()-ing" This reverts commit 60b1bd2511a714690b0b2331e49bc3d0c72e3475. * Make context handling really type-safe * Update comment * Fix grammar... * Refine type aliases for boxed traits * Swap the tuple order for readability & semantics * Simplify PooledScheduler::result_with_timings type * Restore .in_sequence() * Use where for aesthetics * Simplify if... * Fix typo... * Polish ::schedule_execution() a bit * Fix rebase conflicts.. * Make test more readable * Fix test failures after rebase... --- Cargo.lock | 20 + Cargo.toml | 4 + ci/run-sanity.sh | 3 +- core/Cargo.toml | 1 + core/src/validator.rs | 20 + ledger-tool/Cargo.toml | 1 + ledger-tool/src/ledger_utils.rs | 21 + ledger/src/blockstore_processor.rs | 37 +- local-cluster/tests/local_cluster.rs | 41 +- programs/sbf/Cargo.lock | 17 + runtime/src/bank.rs | 2 +- runtime/src/installed_scheduler_pool.rs | 153 ++--- runtime/src/prioritization_fee_cache.rs | 1 + unified-scheduler-logic/Cargo.toml | 10 + unified-scheduler-logic/src/lib.rs | 1 + unified-scheduler-pool/Cargo.toml | 23 + unified-scheduler-pool/src/lib.rs | 761 ++++++++++++++++++++++++ 17 files changed, 1031 insertions(+), 85 deletions(-) create mode 100644 unified-scheduler-logic/Cargo.toml create mode 100644 unified-scheduler-logic/src/lib.rs create mode 100644 unified-scheduler-pool/Cargo.toml create mode 100644 unified-scheduler-pool/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index fe2355270a..9f7d7d6dea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5972,6 +5972,7 @@ dependencies = [ "solana-tpu-client", "solana-transaction-status", "solana-turbine", + "solana-unified-scheduler-pool", "solana-version", "solana-vote", "solana-vote-program", @@ -6431,6 +6432,7 @@ dependencies = [ "solana-storage-bigtable", "solana-streamer", "solana-transaction-status", + "solana-unified-scheduler-pool", "solana-version", "solana-vote-program", "solana_rbpf", @@ -7544,6 +7546,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-unified-scheduler-logic" +version = "1.18.0" + +[[package]] +name = "solana-unified-scheduler-pool" +version = "1.18.0" +dependencies = [ + "assert_matches", + "solana-ledger", + "solana-logger", + "solana-program-runtime", + "solana-runtime", + "solana-sdk", + "solana-unified-scheduler-logic", + "solana-vote", +] + [[package]] name = "solana-upload-perf" version = "1.18.0" diff --git a/Cargo.toml b/Cargo.toml index 844e54f072..d4885ba7f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,6 +108,8 @@ members = [ "transaction-status", "turbine", "udp-client", + "unified-scheduler-logic", + "unified-scheduler-pool", "upload-perf", "validator", "version", @@ -357,6 +359,8 @@ solana-pubsub-client = { path = "pubsub-client", version = "=1.18.0" } solana-quic-client = { path = "quic-client", version = "=1.18.0" } solana-rayon-threadlimit = { path = "rayon-threadlimit", version = "=1.18.0" } solana-remote-wallet = { path = "remote-wallet", version = "=1.18.0", default-features = false } +solana-unified-scheduler-logic = { path = "unified-scheduler-logic", version = "=1.18.0" } +solana-unified-scheduler-pool = { path = "unified-scheduler-pool", version = "=1.18.0" } solana-rpc = { path = "rpc", version = "=1.18.0" } solana-rpc-client = { path = "rpc-client", version = "=1.18.0", default-features = false } solana-rpc-client-api = { path = "rpc-client-api", version = "=1.18.0" } diff --git a/ci/run-sanity.sh b/ci/run-sanity.sh index 3e674d92f4..8108d13a06 100755 --- a/ci/run-sanity.sh +++ b/ci/run-sanity.sh @@ -39,4 +39,5 @@ $solana_ledger_tool create-snapshot --ledger config/ledger "$snapshot_slot" conf cp config/ledger/genesis.tar.bz2 config/snapshot-ledger $solana_ledger_tool copy --ledger config/ledger \ --target-db config/snapshot-ledger --starting-slot "$snapshot_slot" --ending-slot "$latest_slot" -$solana_ledger_tool verify --ledger config/snapshot-ledger +$solana_ledger_tool verify --ledger config/snapshot-ledger --block-verification-method blockstore-processor +$solana_ledger_tool verify --ledger config/snapshot-ledger --block-verification-method unified-scheduler diff --git a/core/Cargo.toml b/core/Cargo.toml index 0bc1a3fe37..bc1bd4549f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -69,6 +69,7 @@ solana-streamer = { workspace = true } solana-tpu-client = { workspace = true } solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } +solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } diff --git a/core/src/validator.rs b/core/src/validator.rs index df5ec80f43..5f4a312123 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -118,6 +118,7 @@ use { solana_send_transaction_service::send_transaction_service, solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::{self, broadcast_stage::BroadcastStageType}, + solana_unified_scheduler_pool::DefaultSchedulerPool, solana_vote_program::vote_state, solana_wen_restart::wen_restart::wait_for_wen_restart, std::{ @@ -144,6 +145,7 @@ const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; pub enum BlockVerificationMethod { #[default] BlockstoreProcessor, + UnifiedScheduler, } impl BlockVerificationMethod { @@ -813,6 +815,24 @@ impl Validator { // (by both replay stage and banking stage) let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); + match &config.block_verification_method { + BlockVerificationMethod::BlockstoreProcessor => { + info!("no scheduler pool is installed for block verification..."); + } + BlockVerificationMethod::UnifiedScheduler => { + let scheduler_pool = DefaultSchedulerPool::new_dyn( + config.runtime_config.log_messages_bytes_limit, + transaction_status_sender.clone(), + Some(replay_vote_sender.clone()), + prioritization_fee_cache.clone(), + ); + bank_forks + .write() + .unwrap() + .install_scheduler_pool(scheduler_pool); + } + } + let leader_schedule_cache = Arc::new(leader_schedule_cache); let entry_notification_sender = entry_notifier_service .as_ref() diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index 94d9bbe470..ddc1ca9b56 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -45,6 +45,7 @@ solana-stake-program = { workspace = true } solana-storage-bigtable = { workspace = true } solana-streamer = { workspace = true } solana-transaction-status = { workspace = true } +solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } solana-vote-program = { workspace = true } solana_rbpf = { workspace = true, features = ["debugger"] } diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index e3f1b48a0a..e72804c201 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -30,6 +30,7 @@ use { PrunedBanksRequestHandler, SnapshotRequestHandler, }, bank_forks::BankForks, + prioritization_fee_cache::PrioritizationFeeCache, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, snapshot_utils::{ @@ -42,6 +43,7 @@ use { timing::timestamp, }, solana_streamer::socket::SocketAddrSpace, + solana_unified_scheduler_pool::DefaultSchedulerPool, std::{ path::{Path, PathBuf}, process::exit, @@ -305,6 +307,25 @@ pub fn load_and_process_ledger( "Using: block-verification-method: {}", block_verification_method, ); + match block_verification_method { + BlockVerificationMethod::BlockstoreProcessor => { + info!("no scheduler pool is installed for block verification..."); + } + BlockVerificationMethod::UnifiedScheduler => { + let no_transaction_status_sender = None; + let no_replay_vote_sender = None; + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + bank_forks + .write() + .unwrap() + .install_scheduler_pool(DefaultSchedulerPool::new_dyn( + process_options.runtime_config.log_messages_bytes_limit, + no_transaction_status_sender, + no_replay_vote_sender, + ignored_prioritization_fee_cache, + )); + } + } let node_id = Arc::new(Keypair::new()); let cluster_info = Arc::new(ClusterInfo::new( diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index e2208ce557..cc8a4e5cb6 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -74,7 +74,7 @@ use { thiserror::Error, }; -struct TransactionBatchWithIndexes<'a, 'b> { +pub struct TransactionBatchWithIndexes<'a, 'b> { pub batch: TransactionBatch<'a, 'b>, pub transaction_indexes: Vec, } @@ -134,7 +134,7 @@ fn get_first_error( first_err } -fn execute_batch( +pub fn execute_batch( batch: &TransactionBatchWithIndexes, bank: &Arc, transaction_status_sender: Option<&TransactionStatusSender>, @@ -1832,7 +1832,7 @@ pub struct TransactionStatusBatch { pub transaction_indexes: Vec, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionStatusSender { pub sender: Sender, } @@ -1947,7 +1947,9 @@ pub mod tests { genesis_utils::{ self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }, - installed_scheduler_pool::{MockInstalledScheduler, SchedulingContext, WaitReason}, + installed_scheduler_pool::{ + MockInstalledScheduler, MockUninstalledScheduler, SchedulingContext, + }, }, solana_sdk::{ account::{AccountSharedData, WritableAccount}, @@ -4545,11 +4547,12 @@ pub mod tests { let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); let mut mocked_scheduler = MockInstalledScheduler::new(); - let mut seq = mockall::Sequence::new(); + let seq = Arc::new(Mutex::new(mockall::Sequence::new())); + let seq_cloned = seq.clone(); mocked_scheduler .expect_context() .times(1) - .in_sequence(&mut seq) + .in_sequence(&mut seq.lock().unwrap()) .return_const(context); mocked_scheduler .expect_schedule_execution() @@ -4557,15 +4560,21 @@ pub mod tests { .returning(|_| ()); mocked_scheduler .expect_wait_for_termination() - .with(mockall::predicate::eq(WaitReason::DroppedFromBankForks)) + .with(mockall::predicate::eq(true)) .times(1) - .in_sequence(&mut seq) - .returning(|_| None); - mocked_scheduler - .expect_return_to_pool() - .times(1) - .in_sequence(&mut seq) - .returning(|| ()); + .in_sequence(&mut seq.lock().unwrap()) + .returning(move |_| { + let mut mocked_uninstalled_scheduler = MockUninstalledScheduler::new(); + mocked_uninstalled_scheduler + .expect_return_to_pool() + .times(1) + .in_sequence(&mut seq_cloned.lock().unwrap()) + .returning(|| ()); + ( + (Ok(()), ExecuteTimings::default()), + Box::new(mocked_uninstalled_scheduler), + ) + }); let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler))); let batch = bank.prepare_sanitized_batch(&txs); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 597134cce0..150b4d3550 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -4,6 +4,7 @@ use { crossbeam_channel::{unbounded, Receiver}, gag::BufferRedirect, log::*, + rand::seq::IteratorRandom, serial_test::serial, solana_accounts_db::{ accounts_db::create_accounts_run_and_snapshot_dirs, hardened_unpack::open_genesis_config, @@ -15,7 +16,7 @@ use { }, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, - validator::ValidatorConfig, + validator::{BlockVerificationMethod, ValidatorConfig}, }, solana_download_utils::download_snapshot_archive, solana_entry::entry::create_ticks, @@ -5456,6 +5457,44 @@ fn test_duplicate_shreds_switch_failure() { ); } +#[test] +#[serial] +fn test_randomly_mixed_block_verification_methods_between_bootstrap_and_not() { + // tailored logging just to see two block verification methods are working correctly + solana_logger::setup_with_default( + "solana_metrics::metrics=warn,\ + solana_core=warn,\ + solana_runtime::installed_scheduler_pool=trace,\ + solana_ledger::blockstore_processor=debug,\ + info", + ); + + let num_nodes = 2; + let mut config = ClusterConfig::new_with_equal_stakes( + num_nodes, + DEFAULT_CLUSTER_LAMPORTS, + DEFAULT_NODE_STAKE, + ); + + // Randomly switch to use unified scheduler + config + .validator_configs + .iter_mut() + .choose(&mut rand::thread_rng()) + .unwrap() + .block_verification_method = BlockVerificationMethod::UnifiedScheduler; + + let local = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + cluster_tests::spend_and_verify_all_nodes( + &local.entry_point_info, + &local.funding_keypair, + num_nodes, + HashSet::new(), + SocketAddrSpace::Unspecified, + &local.connection_cache, + ); +} + /// Forks previous marked invalid should be marked as such in fork choice on restart #[test] #[serial] diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 224b06f4e9..2909c6b5ae 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5041,6 +5041,7 @@ dependencies = [ "solana-tpu-client", "solana-transaction-status", "solana-turbine", + "solana-unified-scheduler-pool", "solana-version", "solana-vote", "solana-vote-program", @@ -6545,6 +6546,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-unified-scheduler-logic" +version = "1.18.0" + +[[package]] +name = "solana-unified-scheduler-pool" +version = "1.18.0" +dependencies = [ + "solana-ledger", + "solana-program-runtime", + "solana-runtime", + "solana-sdk", + "solana-unified-scheduler-logic", + "solana-vote", +] + [[package]] name = "solana-validator" version = "1.18.0" diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index b865c86d2b..fe63067e08 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4290,7 +4290,7 @@ impl Bank { } /// Prepare a transaction batch from a single transaction without locking accounts - pub(crate) fn prepare_unlocked_batch_from_single_tx<'a>( + pub fn prepare_unlocked_batch_from_single_tx<'a>( &'a self, transaction: &'a SanitizedTransaction, ) -> TransactionBatch<'_, '_> { diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 35b46e420f..d39a18d567 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -39,7 +39,7 @@ use { use {mockall::automock, qualifier_attr::qualifiers}; pub trait InstalledSchedulerPool: Send + Sync + Debug { - fn take_scheduler(&self, context: SchedulingContext) -> DefaultInstalledSchedulerBox; + fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox; } #[cfg_attr(doc, aquamarine::aquamarine)] @@ -107,28 +107,36 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { transaction_with_index: &'a (&'a SanitizedTransaction, usize), ); - /// Wait for a scheduler to terminate after it is notified with the given reason. + /// Wait for a scheduler to terminate after processing. /// - /// Firstly, this function blocks the current thread while waiting for the scheduler to - /// complete all of the executions for the scheduled transactions. This means the scheduler has - /// prepared the finalized `ResultWithTimings` at least internally at the time of existing from - /// this function. If no trsanction is scheduled, the result and timing will be `Ok(())` and - /// `ExecuteTimings::default()` respectively. This is done in the same way regardless of - /// `WaitReason`. + /// This function blocks the current thread while waiting for the scheduler to complete all of + /// the executions for the scheduled transactions and to return the finalized + /// `ResultWithTimings`. Along with the result, this function also makes the scheduler itself + /// uninstalled from the bank by transforming the consumed self. /// - /// After that, the scheduler may behave differently depending on the reason, regarding the - /// final bookkeeping. Specifically, this function guaranteed to return - /// `Some(finalized_result_with_timings)` unless the reason is `PausedForRecentBlockhash`. In - /// the case of `PausedForRecentBlockhash`, the scheduler is responsible to retain the - /// finalized `ResultWithTimings` until it's `wait_for_termination()`-ed with one of the other - /// two reasons later. - #[must_use] - fn wait_for_termination(&mut self, reason: &WaitReason) -> Option; + /// If no transaction is scheduled, the result and timing will be `Ok(())` and + /// `ExecuteTimings::default()` respectively. + fn wait_for_termination( + self: Box, + is_dropped: bool, + ) -> (ResultWithTimings, UninstalledSchedulerBox); + /// Pause a scheduler after processing to update bank's recent blockhash. + /// + /// This function blocks the current thread like wait_for_termination(). However, the scheduler + /// won't be consumed. This means the scheduler is responsible to retain the finalized + /// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result + /// later. + fn pause_for_recent_blockhash(&mut self); +} + +#[cfg_attr(feature = "dev-context-only-utils", automock)] +pub trait UninstalledScheduler: Send + Sync + Debug + 'static { fn return_to_pool(self: Box); } -pub type DefaultInstalledSchedulerBox = Box; +pub type InstalledSchedulerBox = Box; +pub type UninstalledSchedulerBox = Box; pub type InstalledSchedulerPoolArc = Arc; @@ -165,9 +173,9 @@ impl SchedulingContext { pub type ResultWithTimings = (Result<()>, ExecuteTimings); -/// A hint from the bank about the reason the caller is waiting on its scheduler termination. +/// A hint from the bank about the reason the caller is waiting on its scheduler. #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum WaitReason { +enum WaitReason { // The bank wants its scheduler to terminate after the completion of transaction execution, in // order to freeze itself immediately thereafter. This is by far the most normal wait reason. // @@ -178,8 +186,9 @@ pub enum WaitReason { // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that // Drop::drop() is the caller. DroppedFromBankForks, - // The bank wants its scheduler to pause the scheduler after the completion without being - // returned to the pool to collect scheduler's internally-held `ResultWithTimings` later. + // The bank wants its scheduler to pause after the completion without being returned to the + // pool. This is to update bank's recent blockhash and to collect scheduler's internally-held + // `ResultWithTimings` later. PausedForRecentBlockhash, } @@ -192,6 +201,15 @@ impl WaitReason { WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false, } } + + pub fn is_dropped(&self) -> bool { + // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit + // decision to be made, should we add new variants like `PausedForFooBar`... + match self { + WaitReason::DroppedFromBankForks => true, + WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false, + } + } } /// Very thin wrapper around Arc @@ -221,11 +239,11 @@ pub struct BankWithSchedulerInner { bank: Arc, scheduler: InstalledSchedulerRwLock, } -pub type InstalledSchedulerRwLock = RwLock>; +pub type InstalledSchedulerRwLock = RwLock>; impl BankWithScheduler { #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] - pub(crate) fn new(bank: Arc, scheduler: Option) -> Self { + pub(crate) fn new(bank: Arc, scheduler: Option) -> Self { if let Some(bank_in_context) = scheduler .as_ref() .map(|scheduler| scheduler.context().bank()) @@ -341,18 +359,18 @@ impl BankWithSchedulerInner { ); let mut scheduler = scheduler.write().unwrap(); - let result_with_timings = if scheduler.is_some() { - let result_with_timings = scheduler - .as_mut() - .and_then(|scheduler| scheduler.wait_for_termination(&reason)); - if !reason.is_paused() { - let scheduler = scheduler.take().expect("scheduler after waiting"); - scheduler.return_to_pool(); - } - result_with_timings - } else { - None - }; + let result_with_timings = + if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) { + scheduler.pause_for_recent_blockhash(); + None + } else if let Some(scheduler) = scheduler.take() { + let (result_with_timings, uninstalled_scheduler) = + scheduler.wait_for_termination(reason.is_dropped()); + uninstalled_scheduler.return_to_pool(); + Some(result_with_timings) + } else { + None + }; debug!( "wait_for_scheduler_termination(slot: {}, reason: {:?}): finished with: {:?}...", bank.slot(), @@ -411,39 +429,42 @@ mod tests { assert_matches::assert_matches, mockall::Sequence, solana_sdk::system_transaction, + std::sync::Mutex, }; fn setup_mocked_scheduler_with_extra( bank: Arc, - wait_reasons: impl Iterator, + is_dropped_flags: impl Iterator, f: Option, - ) -> DefaultInstalledSchedulerBox { + ) -> InstalledSchedulerBox { let mut mock = MockInstalledScheduler::new(); - let mut seq = Sequence::new(); + let seq = Arc::new(Mutex::new(Sequence::new())); mock.expect_context() .times(1) - .in_sequence(&mut seq) + .in_sequence(&mut seq.lock().unwrap()) .return_const(SchedulingContext::new(bank)); - for wait_reason in wait_reasons { + for wait_reason in is_dropped_flags { + let seq_cloned = seq.clone(); mock.expect_wait_for_termination() .with(mockall::predicate::eq(wait_reason)) .times(1) - .in_sequence(&mut seq) + .in_sequence(&mut seq.lock().unwrap()) .returning(move |_| { - if wait_reason.is_paused() { - None - } else { - Some((Ok(()), ExecuteTimings::default())) - } + let mut mock_uninstalled = MockUninstalledScheduler::new(); + mock_uninstalled + .expect_return_to_pool() + .times(1) + .in_sequence(&mut seq_cloned.lock().unwrap()) + .returning(|| ()); + ( + (Ok(()), ExecuteTimings::default()), + Box::new(mock_uninstalled), + ) }); } - mock.expect_return_to_pool() - .times(1) - .in_sequence(&mut seq) - .returning(|| ()); if let Some(f) = f { f(&mut mock); } @@ -453,11 +474,11 @@ mod tests { fn setup_mocked_scheduler( bank: Arc, - wait_reasons: impl Iterator, - ) -> DefaultInstalledSchedulerBox { + is_dropped_flags: impl Iterator, + ) -> InstalledSchedulerBox { setup_mocked_scheduler_with_extra( bank, - wait_reasons, + is_dropped_flags, None:: ()>, ) } @@ -469,10 +490,7 @@ mod tests { let bank = Arc::new(Bank::default_for_tests()); let bank = BankWithScheduler::new( bank.clone(), - Some(setup_mocked_scheduler( - bank, - [WaitReason::TerminatedToFreeze].into_iter(), - )), + Some(setup_mocked_scheduler(bank, [false].into_iter())), ); assert!(bank.has_installed_scheduler()); assert_matches!(bank.wait_for_completed_scheduler(), Some(_)); @@ -502,10 +520,7 @@ mod tests { let bank = Arc::new(Bank::default_for_tests()); let bank = BankWithScheduler::new( bank.clone(), - Some(setup_mocked_scheduler( - bank, - [WaitReason::DroppedFromBankForks].into_iter(), - )), + Some(setup_mocked_scheduler(bank, [true].into_iter())), ); drop(bank); } @@ -517,13 +532,15 @@ mod tests { let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42)); let bank = BankWithScheduler::new( bank.clone(), - Some(setup_mocked_scheduler( + Some(setup_mocked_scheduler_with_extra( bank, - [ - WaitReason::PausedForRecentBlockhash, - WaitReason::TerminatedToFreeze, - ] - .into_iter(), + [false].into_iter(), + Some(|mocked: &mut MockInstalledScheduler| { + mocked + .expect_pause_for_recent_blockhash() + .times(1) + .returning(|| ()); + }), )), ); goto_end_of_slot_with_scheduler(&bank); @@ -548,7 +565,7 @@ mod tests { let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let mocked_scheduler = setup_mocked_scheduler_with_extra( bank.clone(), - [WaitReason::DroppedFromBankForks].into_iter(), + [true].into_iter(), Some(|mocked: &mut MockInstalledScheduler| { mocked .expect_schedule_execution() diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index c41d5a72bd..ece749387a 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -142,6 +142,7 @@ type SlotPrioritizationFee = DashMap; /// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee, /// A separate internal thread `service_thread` handles additional tasks when a bank is frozen, /// and collecting stats and reporting metrics. +#[derive(Debug)] pub struct PrioritizationFeeCache { cache: Arc>>>, service_thread: Option>, diff --git a/unified-scheduler-logic/Cargo.toml b/unified-scheduler-logic/Cargo.toml new file mode 100644 index 0000000000..764bb0192f --- /dev/null +++ b/unified-scheduler-logic/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "solana-unified-scheduler-logic" +description = "The Solana unified scheduler logic" +documentation = "https://docs.rs/solana-unified-scheduler-logic" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs new file mode 100644 index 0000000000..73a5a82f6d --- /dev/null +++ b/unified-scheduler-logic/src/lib.rs @@ -0,0 +1 @@ +// This file will be populated with actual implementation later. diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml new file mode 100644 index 0000000000..213bc5bb86 --- /dev/null +++ b/unified-scheduler-pool/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "solana-unified-scheduler-pool" +description = "The Solana unified scheduler pool" +documentation = "https://docs.rs/solana-unified-scheduler-pool" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +solana-ledger = { workspace = true } +solana-program-runtime = { workspace = true } +solana-runtime = { workspace = true } +solana-sdk = { workspace = true } +solana-unified-scheduler-logic = { workspace = true } +solana-vote = { workspace = true } + +[dev-dependencies] +assert_matches = { workspace = true } +solana-logger = { workspace = true } +solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs new file mode 100644 index 0000000000..10cb5309e5 --- /dev/null +++ b/unified-scheduler-pool/src/lib.rs @@ -0,0 +1,761 @@ +//! Transaction scheduling code. +//! +//! This crate implements 3 solana-runtime traits (`InstalledScheduler`, `UninstalledScheduler` and +//! `InstalledSchedulerPool`) to provide a concrete transaction scheduling implementation +//! (including executing txes and committing tx results). +//! +//! At the highest level, this crate takes `SanitizedTransaction`s via its `schedule_execution()` +//! and commits any side-effects (i.e. on-chain state changes) into the associated `Bank` via +//! `solana-ledger`'s helper function called `execute_batch()`. + +use { + solana_ledger::blockstore_processor::{ + execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, + }, + solana_program_runtime::timings::ExecuteTimings, + solana_runtime::{ + bank::Bank, + installed_scheduler_pool::{ + InstalledScheduler, InstalledSchedulerBox, InstalledSchedulerPool, + InstalledSchedulerPoolArc, ResultWithTimings, SchedulerId, SchedulingContext, + UninstalledScheduler, UninstalledSchedulerBox, + }, + prioritization_fee_cache::PrioritizationFeeCache, + }, + solana_sdk::transaction::{Result, SanitizedTransaction}, + solana_vote::vote_sender_types::ReplayVoteSender, + std::{ + fmt::Debug, + marker::PhantomData, + sync::{ + atomic::{AtomicU64, Ordering::Relaxed}, + Arc, Mutex, Weak, + }, + }, +}; + +type AtomicSchedulerId = AtomicU64; + +// SchedulerPool must be accessed as a dyn trait from solana-runtime, because SchedulerPool +// contains some internal fields, whose types aren't available in solana-runtime (currently +// TransactionStatusSender; also, PohRecorder in the future)... +#[derive(Debug)] +pub struct SchedulerPool, TH: TaskHandler> { + scheduler_inners: Mutex>, + handler_context: HandlerContext, + // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to + // Arc from &Self, because SchedulerPool is used as in the form of Arc + // almost always. But, this would cause wasted and noisy Arc::clone()'s at every call sites. + // + // Alternatively, `impl InstalledScheduler for Arc` approach could be explored + // but it entails its own problems due to rustc's coherence and necessitated newtype with the + // type graph of InstalledScheduler being quite elaborate. + // + // After these considerations, this weak_self approach is chosen at the cost of some additional + // memory increase. + weak_self: Weak, + next_scheduler_id: AtomicSchedulerId, + _phantom: PhantomData, +} + +#[derive(Debug)] +pub struct HandlerContext { + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, +} + +pub type DefaultSchedulerPool = + SchedulerPool, DefaultTaskHandler>; + +impl SchedulerPool +where + S: SpawnableScheduler, + TH: TaskHandler, +{ + // Some internal impl and test code want an actual concrete type, NOT the + // `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`. + fn new( + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, + ) -> Arc { + Arc::new_cyclic(|weak_self| Self { + scheduler_inners: Mutex::default(), + handler_context: HandlerContext { + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + }, + weak_self: weak_self.clone(), + next_scheduler_id: AtomicSchedulerId::default(), + _phantom: PhantomData, + }) + } + + // This apparently-meaningless wrapper is handy, because some callers explicitly want + // `dyn InstalledSchedulerPool` to be returned for type inference convenience. + pub fn new_dyn( + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, + ) -> InstalledSchedulerPoolArc { + Self::new( + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + ) + } + + // See a comment at the weak_self field for justification of this method's existence. + fn self_arc(&self) -> Arc { + self.weak_self + .upgrade() + .expect("self-referencing Arc-ed pool") + } + + fn new_scheduler_id(&self) -> SchedulerId { + self.next_scheduler_id.fetch_add(1, Relaxed) + } + + fn return_scheduler(&self, scheduler: S::Inner) { + self.scheduler_inners + .lock() + .expect("not poisoned") + .push(scheduler); + } + + fn do_take_scheduler(&self, context: SchedulingContext) -> S { + // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been + // returned recently + if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() { + S::from_inner(inner, context) + } else { + S::spawn(self.self_arc(), context) + } + } +} + +impl InstalledSchedulerPool for SchedulerPool +where + S: SpawnableScheduler, + TH: TaskHandler, +{ + fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox { + Box::new(self.do_take_scheduler(context)) + } +} + +pub trait TaskHandler: Send + Sync + Debug + Sized + 'static { + fn handle( + result: &mut Result<()>, + timings: &mut ExecuteTimings, + bank: &Arc, + transaction: &SanitizedTransaction, + index: usize, + handler_context: &HandlerContext, + ); +} + +#[derive(Debug)] +pub struct DefaultTaskHandler; + +impl TaskHandler for DefaultTaskHandler { + fn handle( + result: &mut Result<()>, + timings: &mut ExecuteTimings, + bank: &Arc, + transaction: &SanitizedTransaction, + index: usize, + handler_context: &HandlerContext, + ) { + // scheduler must properly prevent conflicting tx executions. thus, task handler isn't + // responsible for locking. + let batch = bank.prepare_unlocked_batch_from_single_tx(transaction); + let batch_with_indexes = TransactionBatchWithIndexes { + batch, + transaction_indexes: vec![index], + }; + + *result = execute_batch( + &batch_with_indexes, + bank, + handler_context.transaction_status_sender.as_ref(), + handler_context.replay_vote_sender.as_ref(), + timings, + handler_context.log_messages_bytes_limit, + &handler_context.prioritization_fee_cache, + ); + } +} + +// Currently, simplest possible implementation (i.e. single-threaded) +// this will be replaced with more proper implementation... +// not usable at all, especially for mainnet-beta +#[derive(Debug)] +pub struct PooledScheduler { + inner: PooledSchedulerInner, + context: SchedulingContext, + result_with_timings: Mutex, +} + +#[derive(Debug)] +pub struct PooledSchedulerInner, TH: TaskHandler> { + id: SchedulerId, + pool: Arc>, +} + +impl PooledScheduler { + fn do_spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { + Self::from_inner( + PooledSchedulerInner:: { + id: pool.new_scheduler_id(), + pool, + }, + initial_context, + ) + } +} + +pub trait SpawnableScheduler: InstalledScheduler { + type Inner: Debug + Send + Sync; + + fn into_inner(self) -> (ResultWithTimings, Self::Inner); + + fn from_inner(inner: Self::Inner, context: SchedulingContext) -> Self; + + fn spawn(pool: Arc>, initial_context: SchedulingContext) -> Self + where + Self: Sized; +} + +impl SpawnableScheduler for PooledScheduler { + type Inner = PooledSchedulerInner; + + fn into_inner(self) -> (ResultWithTimings, Self::Inner) { + ( + self.result_with_timings.into_inner().expect("not poisoned"), + self.inner, + ) + } + + fn from_inner(inner: Self::Inner, context: SchedulingContext) -> Self { + Self { + inner, + context, + result_with_timings: Mutex::new((Ok(()), ExecuteTimings::default())), + } + } + + fn spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { + Self::do_spawn(pool, initial_context) + } +} + +impl InstalledScheduler for PooledScheduler { + fn id(&self) -> SchedulerId { + self.inner.id + } + + fn context(&self) -> &SchedulingContext { + &self.context + } + + fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { + let (result, timings) = &mut *self.result_with_timings.lock().expect("not poisoned"); + if result.is_err() { + // just bail out early to short-circuit the processing altogether + return; + } + + // ... so, we're NOT scheduling at all here; rather, just execute tx straight off. the + // inter-tx locking deps aren't needed to be resolved in the case of single-threaded FIFO + // like this. + TH::handle( + result, + timings, + self.context().bank(), + transaction, + index, + &self.inner.pool.handler_context, + ); + } + + fn wait_for_termination( + self: Box, + _is_dropped: bool, + ) -> (ResultWithTimings, UninstalledSchedulerBox) { + let (result_with_timings, uninstalled_scheduler) = self.into_inner(); + (result_with_timings, Box::new(uninstalled_scheduler)) + } + + fn pause_for_recent_blockhash(&mut self) { + // not surprisingly, there's nothing to do for this min impl! + } +} + +impl UninstalledScheduler for PooledSchedulerInner +where + S: SpawnableScheduler>, + TH: TaskHandler, +{ + fn return_to_pool(self: Box) { + self.pool.clone().return_scheduler(*self) + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + assert_matches::assert_matches, + solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + installed_scheduler_pool::{BankWithScheduler, SchedulingContext}, + prioritization_fee_cache::PrioritizationFeeCache, + }, + solana_sdk::{ + clock::MAX_PROCESSING_AGE, + pubkey::Pubkey, + signer::keypair::Keypair, + system_transaction, + transaction::{SanitizedTransaction, TransactionError}, + }, + std::{sync::Arc, thread::JoinHandle}, + }; + + #[test] + fn test_scheduler_pool_new() { + solana_logger::setup(); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + + // this indirectly proves that there should be circular link because there's only one Arc + // at this moment now + assert_eq!((Arc::strong_count(&pool), Arc::weak_count(&pool)), (1, 1)); + let debug = format!("{pool:#?}"); + assert!(!debug.is_empty()); + } + + #[test] + fn test_scheduler_spawn() { + solana_logger::setup(); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + let bank = Arc::new(Bank::default_for_tests()); + let context = SchedulingContext::new(bank); + let scheduler = pool.take_scheduler(context); + + let debug = format!("{scheduler:#?}"); + assert!(!debug.is_empty()); + } + + #[test] + fn test_scheduler_pool_filo() { + solana_logger::setup(); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let bank = Arc::new(Bank::default_for_tests()); + let context = &SchedulingContext::new(bank); + + let scheduler1 = pool.do_take_scheduler(context.clone()); + let scheduler_id1 = scheduler1.id(); + let scheduler2 = pool.do_take_scheduler(context.clone()); + let scheduler_id2 = scheduler2.id(); + assert_ne!(scheduler_id1, scheduler_id2); + + let (result_with_timings, scheduler1) = scheduler1.into_inner(); + assert_matches!(result_with_timings, (Ok(()), _)); + pool.return_scheduler(scheduler1); + let (result_with_timings, scheduler2) = scheduler2.into_inner(); + assert_matches!(result_with_timings, (Ok(()), _)); + pool.return_scheduler(scheduler2); + + let scheduler3 = pool.do_take_scheduler(context.clone()); + assert_eq!(scheduler_id2, scheduler3.id()); + let scheduler4 = pool.do_take_scheduler(context.clone()); + assert_eq!(scheduler_id1, scheduler4.id()); + } + + #[test] + fn test_scheduler_pool_context_drop_unless_reinitialized() { + solana_logger::setup(); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let bank = Arc::new(Bank::default_for_tests()); + let context = &SchedulingContext::new(bank); + let mut scheduler = pool.do_take_scheduler(context.clone()); + + // should never panic. + scheduler.pause_for_recent_blockhash(); + assert_matches!( + Box::new(scheduler).wait_for_termination(false), + ((Ok(()), _), _) + ); + } + + #[test] + fn test_scheduler_pool_context_replace() { + solana_logger::setup(); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let old_bank = &Arc::new(Bank::default_for_tests()); + let new_bank = &Arc::new(Bank::default_for_tests()); + assert!(!Arc::ptr_eq(old_bank, new_bank)); + + let old_context = &SchedulingContext::new(old_bank.clone()); + let new_context = &SchedulingContext::new(new_bank.clone()); + + let scheduler = pool.do_take_scheduler(old_context.clone()); + let scheduler_id = scheduler.id(); + pool.return_scheduler(scheduler.into_inner().1); + + let scheduler = pool.take_scheduler(new_context.clone()); + assert_eq!(scheduler_id, scheduler.id()); + assert!(Arc::ptr_eq(scheduler.context().bank(), new_bank)); + } + + #[test] + fn test_scheduler_pool_install_into_bank_forks() { + solana_logger::setup(); + + let bank = Bank::default_for_tests(); + let bank_forks = BankForks::new_rw_arc(bank); + let mut bank_forks = bank_forks.write().unwrap(); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + bank_forks.install_scheduler_pool(pool); + } + + #[test] + fn test_scheduler_install_into_bank() { + solana_logger::setup(); + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let child_bank = Bank::new_from_parent(bank, &Pubkey::default(), 1); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + + let bank = Bank::default_for_tests(); + let bank_forks = BankForks::new_rw_arc(bank); + let mut bank_forks = bank_forks.write().unwrap(); + + // existing banks in bank_forks shouldn't process transactions anymore in general, so + // shouldn't be touched + assert!(!bank_forks + .working_bank_with_scheduler() + .has_installed_scheduler()); + bank_forks.install_scheduler_pool(pool); + assert!(!bank_forks + .working_bank_with_scheduler() + .has_installed_scheduler()); + + let mut child_bank = bank_forks.insert(child_bank); + assert!(child_bank.has_installed_scheduler()); + bank_forks.remove(child_bank.slot()); + child_bank.drop_scheduler(); + assert!(!child_bank.has_installed_scheduler()); + } + + fn setup_dummy_fork_graph(bank: Bank) -> Arc { + let slot = bank.slot(); + let bank_fork = BankForks::new_rw_arc(bank); + let bank = bank_fork.read().unwrap().get(slot).unwrap(); + bank.loaded_programs_cache + .write() + .unwrap() + .set_fork_graph(bank_fork); + bank + } + + #[test] + fn test_scheduler_schedule_execution_success() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + let context = SchedulingContext::new(bank.clone()); + + assert_eq!(bank.transaction_count(), 0); + let scheduler = pool.take_scheduler(context); + scheduler.schedule_execution(&(tx0, 0)); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + assert_eq!(bank.transaction_count(), 1); + } + + #[test] + fn test_scheduler_schedule_execution_failure() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank = setup_dummy_fork_graph(bank); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + let context = SchedulingContext::new(bank.clone()); + let mut scheduler = pool.take_scheduler(context); + + let unfunded_keypair = Keypair::new(); + let bad_tx = + &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &unfunded_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + assert_eq!(bank.transaction_count(), 0); + scheduler.schedule_execution(&(bad_tx, 0)); + scheduler.pause_for_recent_blockhash(); + assert_eq!(bank.transaction_count(), 0); + + let good_tx_after_bad_tx = + &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 3, + genesis_config.hash(), + )); + // make sure this tx is really a good one to execute. + assert_matches!( + bank.simulate_transaction_unchecked(good_tx_after_bad_tx, false) + .result, + Ok(_) + ); + scheduler.schedule_execution(&(good_tx_after_bad_tx, 0)); + scheduler.pause_for_recent_blockhash(); + // transaction_count should remain same as scheduler should be bailing out. + assert_eq!(bank.transaction_count(), 0); + + let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_matches!( + bank.wait_for_completed_scheduler(), + Some(( + Err(solana_sdk::transaction::TransactionError::AccountNotFound), + _timings + )) + ); + } + + #[derive(Debug)] + struct AsyncScheduler( + PooledScheduler, + Mutex>>, + ); + + impl AsyncScheduler { + fn do_wait(&self) { + let mut overall_result = Ok(()); + let mut overall_timings = ExecuteTimings::default(); + for handle in self.1.lock().unwrap().drain(..) { + let (result, timings) = handle.join().unwrap(); + match result { + Ok(()) => {} + Err(e) => overall_result = Err(e), + } + overall_timings.accumulate(&timings); + } + *self.0.result_with_timings.lock().unwrap() = (overall_result, overall_timings); + } + } + + impl InstalledScheduler + for AsyncScheduler + { + fn id(&self) -> SchedulerId { + self.0.id() + } + + fn context(&self) -> &SchedulingContext { + self.0.context() + } + + fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { + let transaction_and_index = (transaction.clone(), index); + let context = self.context().clone(); + let pool = self.0.inner.pool.clone(); + + self.1.lock().unwrap().push(std::thread::spawn(move || { + // intentionally sleep to simulate race condition where register_recent_blockhash + // is handle before finishing executing scheduled transactions + std::thread::sleep(std::time::Duration::from_secs(1)); + + let mut result = Ok(()); + let mut timings = ExecuteTimings::default(); + + ::handle( + &mut result, + &mut timings, + context.bank(), + &transaction_and_index.0, + transaction_and_index.1, + &pool.handler_context, + ); + (result, timings) + })); + } + + fn wait_for_termination( + self: Box, + is_dropped: bool, + ) -> (ResultWithTimings, UninstalledSchedulerBox) { + self.do_wait(); + Box::new(self.0).wait_for_termination(is_dropped) + } + + fn pause_for_recent_blockhash(&mut self) { + if TRIGGER_RACE_CONDITION { + // this is equivalent to NOT calling wait_for_paused_scheduler() in + // register_recent_blockhash(). + return; + } + self.do_wait(); + } + } + + impl SpawnableScheduler + for AsyncScheduler + { + // well, i wish i can use ! (never type)..... + type Inner = Self; + + fn into_inner(self) -> (ResultWithTimings, Self::Inner) { + todo!(); + } + + fn from_inner(_inner: Self::Inner, _context: SchedulingContext) -> Self { + todo!(); + } + + fn spawn( + pool: Arc>, + initial_context: SchedulingContext, + ) -> Self { + AsyncScheduler::( + PooledScheduler::::from_inner( + PooledSchedulerInner { + id: pool.new_scheduler_id(), + pool: SchedulerPool::new( + pool.handler_context.log_messages_bytes_limit, + pool.handler_context.transaction_status_sender.clone(), + pool.handler_context.replay_vote_sender.clone(), + pool.handler_context.prioritization_fee_cache.clone(), + ), + }, + initial_context, + ), + Mutex::new(vec![]), + ) + } + } + + fn do_test_scheduler_schedule_execution_recent_blockhash_edge_case< + const TRIGGER_RACE_CONDITION: bool, + >() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let very_old_valid_tx = + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let mut bank = Bank::new_for_tests(&genesis_config); + for _ in 0..MAX_PROCESSING_AGE { + bank.fill_bank_with_ticks_for_tests(); + bank.freeze(); + let slot = bank.slot(); + bank = Bank::new_from_parent( + Arc::new(bank), + &Pubkey::default(), + slot.checked_add(1).unwrap(), + ); + } + let bank = setup_dummy_fork_graph(bank); + let context = SchedulingContext::new(bank.clone()); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + SchedulerPool::, DefaultTaskHandler>::new_dyn( + None, + None, + None, + ignored_prioritization_fee_cache, + ); + let scheduler = pool.take_scheduler(context); + + let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_eq!(bank.transaction_count(), 0); + + // schedule but not immediately execute transaction + bank.schedule_transaction_executions([(&very_old_valid_tx, &0)].into_iter()); + // this calls register_recent_blockhash internally + bank.fill_bank_with_ticks_for_tests(); + + if TRIGGER_RACE_CONDITION { + // very_old_valid_tx is wrongly handled as expired! + assert_matches!( + bank.wait_for_completed_scheduler(), + Some((Err(TransactionError::BlockhashNotFound), _)) + ); + assert_eq!(bank.transaction_count(), 0); + } else { + assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + assert_eq!(bank.transaction_count(), 1); + } + } + + #[test] + fn test_scheduler_schedule_execution_recent_blockhash_edge_case_with_race() { + do_test_scheduler_schedule_execution_recent_blockhash_edge_case::(); + } + + #[test] + fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() { + do_test_scheduler_schedule_execution_recent_blockhash_edge_case::(); + } +}