From 136ab21f34793913b5e79dfb38180965ea002019 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 31 Oct 2023 14:33:36 +0900 Subject: [PATCH] Define InstalledScheduler::wait_for_termination() (#33922) * Define InstalledScheduler::wait_for_termination() * Rename to wait_for_scheduler_termination * Comment wait_for_termination and WaitReason better --- Cargo.lock | 1 + core/src/replay_stage.rs | 37 ++- ledger/Cargo.toml | 1 + ledger/src/blockstore_processor.rs | 34 ++- programs/sbf/Cargo.lock | 1 + runtime/src/bank.rs | 31 ++- runtime/src/bank/tests.rs | 2 +- runtime/src/installed_scheduler_pool.rs | 310 +++++++++++++++++++++++- 8 files changed, 401 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 626541d61f..610b5edb49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6258,6 +6258,7 @@ dependencies = [ "libc", "log", "lru", + "mockall", "num_cpus", "num_enum 0.7.0", "prost", diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5cceb8dff3..2aa0d82be0 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -42,7 +42,8 @@ use { block_error::BlockError, blockstore::Blockstore, blockstore_processor::{ - self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender, + self, BlockstoreProcessorError, ConfirmationProgress, ExecuteBatchesInternalMetrics, + TransactionStatusSender, }, entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, @@ -2815,6 +2816,40 @@ impl ReplayStage { .expect("Bank fork progress entry missing for completed bank"); let replay_stats = bank_progress.replay_stats.clone(); + + if let Some((result, completed_execute_timings)) = + bank.wait_for_completed_scheduler() + { + let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads( + completed_execute_timings, + ); + replay_stats + .write() + .unwrap() + .batch_execute + .accumulate(metrics); + + if let Err(err) = result { + Self::mark_dead_slot( + blockstore, + bank, + bank_forks.read().unwrap().root(), + &BlockstoreProcessorError::InvalidTransaction(err), + rpc_subscriptions, + duplicate_slots_tracker, + gossip_duplicate_confirmed_slots, + epoch_slots_frozen_slots, + progress, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + ); + // don't try to run the remaining normal processing for the completed bank + continue; + } + } + let r_replay_stats = replay_stats.read().unwrap(); let replay_progress = bank_progress.replay_progress.clone(); let r_replay_progress = replay_progress.read().unwrap(); diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index b3fb1ac5f9..87ba0c3923 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -25,6 +25,7 @@ lazy_static = { workspace = true } libc = { workspace = true } log = { workspace = true } lru = { workspace = true } +mockall = { workspace = true } num_cpus = { workspace = true } num_enum = { workspace = true } prost = { workspace = true } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index bb717ff834..ccdfb97ece 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -216,12 +216,27 @@ fn execute_batch( } #[derive(Default)] -struct ExecuteBatchesInternalMetrics { +pub struct ExecuteBatchesInternalMetrics { execution_timings_per_thread: HashMap, total_batches_len: u64, execute_batches_us: u64, } +impl ExecuteBatchesInternalMetrics { + pub fn new_with_timings_from_all_threads(execute_timings: ExecuteTimings) -> Self { + const DUMMY_THREAD_INDEX: usize = 999; + let mut new = Self::default(); + new.execution_timings_per_thread.insert( + DUMMY_THREAD_INDEX, + ThreadExecuteTimings { + execute_timings, + ..ThreadExecuteTimings::default() + }, + ); + new + } +} + fn execute_batches_internal( bank: &Arc, batches: &[TransactionBatchWithIndexes], @@ -1068,7 +1083,7 @@ pub struct BatchExecutionTiming { } impl BatchExecutionTiming { - fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) { + pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) { let Self { totals, wall_clock_us, @@ -1382,6 +1397,9 @@ fn process_bank_0( &mut ExecuteTimings::default(), ) .expect("Failed to process bank 0 from ledger. Did you forget to provide a snapshot?"); + if let Some((result, _timings)) = bank0.wait_for_completed_scheduler() { + result.unwrap(); + } bank0.freeze(); if blockstore.is_primary_access() { blockstore.insert_bank_hash(bank0.slot(), bank0.hash(), false); @@ -1784,6 +1802,9 @@ fn process_single_slot( err })?; + if let Some((result, _timings)) = bank.wait_for_completed_scheduler() { + result? + } bank.freeze(); // all banks handled by this routine are created from complete slots if blockstore.is_primary_access() { blockstore.insert_bank_hash(bank.slot(), bank.hash(), false); @@ -1924,7 +1945,7 @@ pub mod tests { genesis_utils::{ self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }, - installed_scheduler_pool::MockInstalledScheduler, + installed_scheduler_pool::{MockInstalledScheduler, WaitReason}, }, solana_sdk::{ account::{AccountSharedData, WritableAccount}, @@ -4510,10 +4531,17 @@ pub mod tests { let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); let mut mocked_scheduler = MockInstalledScheduler::new(); + let mut seq = mockall::Sequence::new(); mocked_scheduler .expect_schedule_execution() .times(txs.len()) .returning(|_| ()); + mocked_scheduler + .expect_wait_for_termination() + .with(mockall::predicate::eq(WaitReason::DroppedFromBankForks)) + .times(1) + .in_sequence(&mut seq) + .returning(|_| None); let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler))); let batch = bank.prepare_sanitized_batch(&txs); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 71baf3c793..a81280a233 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5191,6 +5191,7 @@ dependencies = [ "libc", "log", "lru", + "mockall", "num_cpus", "num_enum 0.7.0", "prost", diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 2bdb21e6d3..7a770833cc 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -43,6 +43,7 @@ use { builtins::{BuiltinPrototype, BUILTINS}, epoch_rewards_hasher::hash_rewards_into_partitions, epoch_stakes::{EpochStakes, NodeVoteAccounts}, + installed_scheduler_pool::{BankWithScheduler, InstalledSchedulerRwLock}, runtime_config::RuntimeConfig, serde_snapshot::BankIncrementalSnapshotPersistence, snapshot_hash::SnapshotHash, @@ -220,7 +221,7 @@ mod metrics; mod serde_snapshot; mod sysvar_cache; #[cfg(test)] -mod tests; +pub(crate) mod tests; mod transaction_account_state_info; pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0; @@ -4185,7 +4186,11 @@ impl Bank { /// Register a new recent blockhash in the bank's recent blockhash queue. Called when a bank /// reaches its max tick height. Can be called by tests to get new blockhashes for transaction /// processing without advancing to a new bank slot. - pub fn register_recent_blockhash(&self, blockhash: &Hash) { + fn register_recent_blockhash(&self, blockhash: &Hash, scheduler: &InstalledSchedulerRwLock) { + // This is needed because recent_blockhash updates necessitate synchronizations for + // consistent tx check_age handling. + BankWithScheduler::wait_for_paused_scheduler(self, scheduler); + // Only acquire the write lock for the blockhash queue on block boundaries because // readers can starve this write lock acquisition and ticks would be slowed down too // much if the write lock is acquired for each tick. @@ -4197,7 +4202,10 @@ impl Bank { // gating this under #[cfg(feature = "dev-context-only-utils")] isn't easy due to // solana-program-test's usage... pub fn register_unique_recent_blockhash_for_test(&self) { - self.register_recent_blockhash(&Hash::new_unique()) + self.register_recent_blockhash( + &Hash::new_unique(), + &BankWithScheduler::no_scheduler_available(), + ) } /// Tell the bank which Entry IDs exist on the ledger. This function assumes subsequent calls @@ -4206,14 +4214,14 @@ impl Bank { /// /// This is NOT thread safe because if tick height is updated by two different threads, the /// block boundary condition could be missed. - pub fn register_tick(&self, hash: &Hash) { + pub fn register_tick(&self, hash: &Hash, scheduler: &InstalledSchedulerRwLock) { assert!( !self.freeze_started(), "register_tick() working on a bank that is already frozen or is undergoing freezing!" ); if self.is_block_boundary(self.tick_height.load(Relaxed) + 1) { - self.register_recent_blockhash(hash); + self.register_recent_blockhash(hash, scheduler); } // ReplayStage will start computing the accounts delta hash when it @@ -4226,18 +4234,17 @@ impl Bank { #[cfg(feature = "dev-context-only-utils")] pub fn register_tick_for_test(&self, hash: &Hash) { - // currently meaningless wrapper; upcoming pr will make it an actual helper... - self.register_tick(hash) + self.register_tick(hash, &BankWithScheduler::no_scheduler_available()) } #[cfg(feature = "dev-context-only-utils")] pub fn register_default_tick_for_test(&self) { - self.register_tick(&Hash::default()) + self.register_tick_for_test(&Hash::default()) } #[cfg(feature = "dev-context-only-utils")] pub fn register_unique_tick(&self) { - self.register_tick(&Hash::new_unique()) + self.register_tick_for_test(&Hash::new_unique()) } pub fn is_complete(&self) -> bool { @@ -8008,10 +8015,14 @@ impl Bank { } pub fn fill_bank_with_ticks_for_tests(&self) { + self.do_fill_bank_with_ticks_for_tests(&BankWithScheduler::no_scheduler_available()) + } + + pub(crate) fn do_fill_bank_with_ticks_for_tests(&self, scheduler: &InstalledSchedulerRwLock) { if self.tick_height.load(Relaxed) < self.max_tick_height { let last_blockhash = self.last_blockhash(); while self.last_blockhash() == last_blockhash { - self.register_tick(&Hash::new_unique()) + self.register_tick(&Hash::new_unique(), scheduler) } } else { warn!("Bank already reached max tick height, cannot fill it with more ticks"); diff --git a/runtime/src/bank/tests.rs b/runtime/src/bank/tests.rs index 2f1c0e0aee..df39171d84 100644 --- a/runtime/src/bank/tests.rs +++ b/runtime/src/bank/tests.rs @@ -274,7 +274,7 @@ fn test_bank_new() { assert_eq!(rent.lamports_per_byte_year, 5); } -fn create_simple_test_bank(lamports: u64) -> Bank { +pub(crate) fn create_simple_test_bank(lamports: u64) -> Bank { let (genesis_config, _mint_keypair) = create_genesis_config(lamports); Bank::new_for_tests(&genesis_config) } diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 5fef97bc6e..553a31c800 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -4,7 +4,11 @@ use { crate::bank::Bank, log::*, - solana_sdk::transaction::SanitizedTransaction, + solana_program_runtime::timings::ExecuteTimings, + solana_sdk::{ + hash::Hash, + transaction::{Result, SanitizedTransaction}, + }, std::{ fmt::Debug, ops::Deref, @@ -23,14 +27,64 @@ use {mockall::automock, qualifier_attr::qualifiers}; allow(unused_attributes, clippy::needless_lifetimes) )] pub trait InstalledScheduler: Send + Sync + Debug + 'static { + // Calling this is illegal as soon as wait_for_termination is called. fn schedule_execution<'a>( &'a self, transaction_with_index: &'a (&'a SanitizedTransaction, usize), ); + + /// Wait for a scheduler to terminate after it is notified with the given reason. + /// + /// Firstly, this function blocks the current thread while waiting for the scheduler to + /// complete all of the executions for the scheduled transactions. This means the scheduler has + /// prepared the finalized `ResultWithTimings` at least internally at the time of existing from + /// this function. If no trsanction is scheduled, the result and timing will be `Ok(())` and + /// `ExecuteTimings::default()` respectively. This is done in the same way regardless of + /// `WaitReason`. + /// + /// After that, the scheduler may behave differently depending on the reason, regarding the + /// final bookkeeping. Specifically, this function guaranteed to return + /// `Some(finalized_result_with_timings)` unless the reason is `PausedForRecentBlockhash`. In + /// the case of `PausedForRecentBlockhash`, the scheduler is responsible to retain the + /// finalized `ResultWithTimings` until it's `wait_for_termination()`-ed with one of the other + /// two reasons later. + #[must_use] + fn wait_for_termination(&mut self, reason: &WaitReason) -> Option; } pub type DefaultInstalledSchedulerBox = Box; +pub type ResultWithTimings = (Result<()>, ExecuteTimings); + +/// A hint from the bank about the reason the caller is waiting on its scheduler termination. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum WaitReason { + // The bank wants its scheduler to terminate after the completion of transaction execution, in + // order to freeze itself immediately thereafter. This is by far the most normal wait reason. + // + // Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior + // to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it + // infallible. + TerminatedToFreeze, + // The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that + // Drop::drop() is the caller. + DroppedFromBankForks, + // The bank wants its scheduler to pause the scheduler after the completion without being + // returned to the pool to collect scheduler's internally-held `ResultWithTimings` later. + PausedForRecentBlockhash, +} + +impl WaitReason { + pub fn is_paused(&self) -> bool { + // Exhaustive `match` is preferred here than `matches!()` to trigger an explicit + // decision to be made, should we add new variants like `PausedForFooBar`... + match self { + WaitReason::PausedForRecentBlockhash => true, + WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false, + } + } +} + /// Very thin wrapper around Arc /// /// It brings type-safety against accidental mixing of bank and scheduler with different slots, @@ -85,6 +139,14 @@ impl BankWithScheduler { self.inner.bank.clone() } + pub fn register_tick(&self, hash: &Hash) { + self.inner.bank.register_tick(hash, &self.inner.scheduler); + } + + pub fn fill_bank_with_ticks_for_tests(&self) { + self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler); + } + pub fn has_installed_scheduler(&self) -> bool { self.inner.scheduler.read().unwrap().is_some() } @@ -107,11 +169,111 @@ impl BankWithScheduler { } } + // take needless &mut only to communicate its semantic mutability to humans... + #[cfg(feature = "dev-context-only-utils")] + pub fn drop_scheduler(&mut self) { + self.inner.drop_scheduler(); + } + + pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) { + let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination( + bank, + scheduler, + WaitReason::PausedForRecentBlockhash, + ); + assert!( + maybe_result_with_timings.is_none(), + "Premature result was returned from scheduler after paused" + ); + } + + #[must_use] + pub fn wait_for_completed_scheduler(&self) -> Option { + BankWithSchedulerInner::wait_for_scheduler_termination( + &self.inner.bank, + &self.inner.scheduler, + WaitReason::TerminatedToFreeze, + ) + } + pub const fn no_scheduler_available() -> InstalledSchedulerRwLock { RwLock::new(None) } } +impl BankWithSchedulerInner { + #[must_use] + fn wait_for_completed_scheduler_from_drop(&self) -> Option { + Self::wait_for_scheduler_termination( + &self.bank, + &self.scheduler, + WaitReason::DroppedFromBankForks, + ) + } + + #[must_use] + fn wait_for_scheduler_termination( + bank: &Bank, + scheduler: &InstalledSchedulerRwLock, + reason: WaitReason, + ) -> Option { + debug!( + "wait_for_scheduler_termination(slot: {}, reason: {:?}): started...", + bank.slot(), + reason, + ); + + let mut scheduler = scheduler.write().unwrap(); + let result_with_timings = if scheduler.is_some() { + let result_with_timings = scheduler + .as_mut() + .and_then(|scheduler| scheduler.wait_for_termination(&reason)); + if !reason.is_paused() { + drop(scheduler.take().expect("scheduler after waiting")); + } + result_with_timings + } else { + None + }; + debug!( + "wait_for_scheduler_termination(slot: {}, reason: {:?}): finished with: {:?}...", + bank.slot(), + reason, + result_with_timings.as_ref().map(|(result, _)| result), + ); + + result_with_timings + } + + fn drop_scheduler(&self) { + if std::thread::panicking() { + error!( + "BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...", + self.bank.slot(), + ); + return; + } + + // There's no guarantee ResultWithTimings is available or not at all when being dropped. + if let Some(Err(err)) = self + .wait_for_completed_scheduler_from_drop() + .map(|(result, _timings)| result) + { + warn!( + "BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}", + self.bank.slot(), + err, + ); + } + } +} + +impl Drop for BankWithSchedulerInner { + fn drop(&mut self) { + self.drop_scheduler(); + } +} + impl Deref for BankWithScheduler { type Target = Arc; @@ -119,3 +281,149 @@ impl Deref for BankWithScheduler { &self.inner.bank } } + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + bank::test_utils::goto_end_of_slot_with_scheduler, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + assert_matches::assert_matches, + mockall::Sequence, + solana_sdk::system_transaction, + }; + + fn setup_mocked_scheduler_with_extra( + wait_reasons: impl Iterator, + f: Option, + ) -> DefaultInstalledSchedulerBox { + let mut mock = MockInstalledScheduler::new(); + let mut seq = Sequence::new(); + + for wait_reason in wait_reasons { + mock.expect_wait_for_termination() + .with(mockall::predicate::eq(wait_reason)) + .times(1) + .in_sequence(&mut seq) + .returning(move |_| { + if wait_reason.is_paused() { + None + } else { + Some((Ok(()), ExecuteTimings::default())) + } + }); + } + + if let Some(f) = f { + f(&mut mock); + } + + Box::new(mock) + } + + fn setup_mocked_scheduler( + wait_reasons: impl Iterator, + ) -> DefaultInstalledSchedulerBox { + setup_mocked_scheduler_with_extra( + wait_reasons, + None:: ()>, + ) + } + + #[test] + fn test_scheduler_normal_termination() { + solana_logger::setup(); + + let bank = Arc::new(Bank::default_for_tests()); + let bank = BankWithScheduler::new( + bank, + Some(setup_mocked_scheduler( + [WaitReason::TerminatedToFreeze].into_iter(), + )), + ); + assert!(bank.has_installed_scheduler()); + assert_matches!(bank.wait_for_completed_scheduler(), Some(_)); + + // Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being + // returned. + assert!(!bank.has_installed_scheduler()); + assert_matches!(bank.wait_for_completed_scheduler(), None); + } + + #[test] + fn test_no_scheduler_termination() { + solana_logger::setup(); + + let bank = Arc::new(Bank::default_for_tests()); + let bank = BankWithScheduler::new_without_scheduler(bank); + + // Calling wait_for_completed_scheduler() is noop, when no scheduler is installed. + assert!(!bank.has_installed_scheduler()); + assert_matches!(bank.wait_for_completed_scheduler(), None); + } + + #[test] + fn test_scheduler_termination_from_drop() { + solana_logger::setup(); + + let bank = Arc::new(Bank::default_for_tests()); + let bank = BankWithScheduler::new( + bank, + Some(setup_mocked_scheduler( + [WaitReason::DroppedFromBankForks].into_iter(), + )), + ); + drop(bank); + } + + #[test] + fn test_scheduler_pause() { + solana_logger::setup(); + + let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42)); + let bank = BankWithScheduler::new( + bank, + Some(setup_mocked_scheduler( + [ + WaitReason::PausedForRecentBlockhash, + WaitReason::TerminatedToFreeze, + ] + .into_iter(), + )), + ); + goto_end_of_slot_with_scheduler(&bank); + assert_matches!(bank.wait_for_completed_scheduler(), Some(_)); + } + + #[test] + fn test_schedule_executions() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let tx0 = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let mocked_scheduler = setup_mocked_scheduler_with_extra( + [WaitReason::DroppedFromBankForks].into_iter(), + Some(|mocked: &mut MockInstalledScheduler| { + mocked + .expect_schedule_execution() + .times(1) + .returning(|(_, _)| ()); + }), + ); + + let bank = BankWithScheduler::new(bank, Some(mocked_scheduler)); + bank.schedule_transaction_executions([(&tx0, &0)].into_iter()); + } +}