Define InstalledScheduler::wait_for_termination() (#33922)

* Define InstalledScheduler::wait_for_termination()

* Rename to wait_for_scheduler_termination

* Comment wait_for_termination and WaitReason better
This commit is contained in:
Ryo Onodera 2023-10-31 14:33:36 +09:00 committed by GitHub
parent b2cec5aa48
commit 136ab21f34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 401 additions and 16 deletions

1
Cargo.lock generated
View File

@ -6258,6 +6258,7 @@ dependencies = [
"libc",
"log",
"lru",
"mockall",
"num_cpus",
"num_enum 0.7.0",
"prost",

View File

@ -42,7 +42,8 @@ use {
block_error::BlockError,
blockstore::Blockstore,
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
self, BlockstoreProcessorError, ConfirmationProgress, ExecuteBatchesInternalMetrics,
TransactionStatusSender,
},
entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache,
@ -2815,6 +2816,40 @@ impl ReplayStage {
.expect("Bank fork progress entry missing for completed bank");
let replay_stats = bank_progress.replay_stats.clone();
if let Some((result, completed_execute_timings)) =
bank.wait_for_completed_scheduler()
{
let metrics = ExecuteBatchesInternalMetrics::new_with_timings_from_all_threads(
completed_execute_timings,
);
replay_stats
.write()
.unwrap()
.batch_execute
.accumulate(metrics);
if let Err(err) = result {
Self::mark_dead_slot(
blockstore,
bank,
bank_forks.read().unwrap().root(),
&BlockstoreProcessorError::InvalidTransaction(err),
rpc_subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
epoch_slots_frozen_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
);
// don't try to run the remaining normal processing for the completed bank
continue;
}
}
let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();

View File

@ -25,6 +25,7 @@ lazy_static = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
mockall = { workspace = true }
num_cpus = { workspace = true }
num_enum = { workspace = true }
prost = { workspace = true }

View File

@ -216,12 +216,27 @@ fn execute_batch(
}
#[derive(Default)]
struct ExecuteBatchesInternalMetrics {
pub struct ExecuteBatchesInternalMetrics {
execution_timings_per_thread: HashMap<usize, ThreadExecuteTimings>,
total_batches_len: u64,
execute_batches_us: u64,
}
impl ExecuteBatchesInternalMetrics {
pub fn new_with_timings_from_all_threads(execute_timings: ExecuteTimings) -> Self {
const DUMMY_THREAD_INDEX: usize = 999;
let mut new = Self::default();
new.execution_timings_per_thread.insert(
DUMMY_THREAD_INDEX,
ThreadExecuteTimings {
execute_timings,
..ThreadExecuteTimings::default()
},
);
new
}
}
fn execute_batches_internal(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
@ -1068,7 +1083,7 @@ pub struct BatchExecutionTiming {
}
impl BatchExecutionTiming {
fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
pub fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
let Self {
totals,
wall_clock_us,
@ -1382,6 +1397,9 @@ fn process_bank_0(
&mut ExecuteTimings::default(),
)
.expect("Failed to process bank 0 from ledger. Did you forget to provide a snapshot?");
if let Some((result, _timings)) = bank0.wait_for_completed_scheduler() {
result.unwrap();
}
bank0.freeze();
if blockstore.is_primary_access() {
blockstore.insert_bank_hash(bank0.slot(), bank0.hash(), false);
@ -1784,6 +1802,9 @@ fn process_single_slot(
err
})?;
if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
result?
}
bank.freeze(); // all banks handled by this routine are created from complete slots
if blockstore.is_primary_access() {
blockstore.insert_bank_hash(bank.slot(), bank.hash(), false);
@ -1924,7 +1945,7 @@ pub mod tests {
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::MockInstalledScheduler,
installed_scheduler_pool::{MockInstalledScheduler, WaitReason},
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
@ -4510,10 +4531,17 @@ pub mod tests {
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let mut mocked_scheduler = MockInstalledScheduler::new();
let mut seq = mockall::Sequence::new();
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
mocked_scheduler
.expect_wait_for_termination()
.with(mockall::predicate::eq(WaitReason::DroppedFromBankForks))
.times(1)
.in_sequence(&mut seq)
.returning(|_| None);
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));
let batch = bank.prepare_sanitized_batch(&txs);

View File

@ -5191,6 +5191,7 @@ dependencies = [
"libc",
"log",
"lru",
"mockall",
"num_cpus",
"num_enum 0.7.0",
"prost",

View File

@ -43,6 +43,7 @@ use {
builtins::{BuiltinPrototype, BUILTINS},
epoch_rewards_hasher::hash_rewards_into_partitions,
epoch_stakes::{EpochStakes, NodeVoteAccounts},
installed_scheduler_pool::{BankWithScheduler, InstalledSchedulerRwLock},
runtime_config::RuntimeConfig,
serde_snapshot::BankIncrementalSnapshotPersistence,
snapshot_hash::SnapshotHash,
@ -220,7 +221,7 @@ mod metrics;
mod serde_snapshot;
mod sysvar_cache;
#[cfg(test)]
mod tests;
pub(crate) mod tests;
mod transaction_account_state_info;
pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;
@ -4185,7 +4186,11 @@ impl Bank {
/// Register a new recent blockhash in the bank's recent blockhash queue. Called when a bank
/// reaches its max tick height. Can be called by tests to get new blockhashes for transaction
/// processing without advancing to a new bank slot.
pub fn register_recent_blockhash(&self, blockhash: &Hash) {
fn register_recent_blockhash(&self, blockhash: &Hash, scheduler: &InstalledSchedulerRwLock) {
// This is needed because recent_blockhash updates necessitate synchronizations for
// consistent tx check_age handling.
BankWithScheduler::wait_for_paused_scheduler(self, scheduler);
// Only acquire the write lock for the blockhash queue on block boundaries because
// readers can starve this write lock acquisition and ticks would be slowed down too
// much if the write lock is acquired for each tick.
@ -4197,7 +4202,10 @@ impl Bank {
// gating this under #[cfg(feature = "dev-context-only-utils")] isn't easy due to
// solana-program-test's usage...
pub fn register_unique_recent_blockhash_for_test(&self) {
self.register_recent_blockhash(&Hash::new_unique())
self.register_recent_blockhash(
&Hash::new_unique(),
&BankWithScheduler::no_scheduler_available(),
)
}
/// Tell the bank which Entry IDs exist on the ledger. This function assumes subsequent calls
@ -4206,14 +4214,14 @@ impl Bank {
///
/// This is NOT thread safe because if tick height is updated by two different threads, the
/// block boundary condition could be missed.
pub fn register_tick(&self, hash: &Hash) {
pub fn register_tick(&self, hash: &Hash, scheduler: &InstalledSchedulerRwLock) {
assert!(
!self.freeze_started(),
"register_tick() working on a bank that is already frozen or is undergoing freezing!"
);
if self.is_block_boundary(self.tick_height.load(Relaxed) + 1) {
self.register_recent_blockhash(hash);
self.register_recent_blockhash(hash, scheduler);
}
// ReplayStage will start computing the accounts delta hash when it
@ -4226,18 +4234,17 @@ impl Bank {
#[cfg(feature = "dev-context-only-utils")]
pub fn register_tick_for_test(&self, hash: &Hash) {
// currently meaningless wrapper; upcoming pr will make it an actual helper...
self.register_tick(hash)
self.register_tick(hash, &BankWithScheduler::no_scheduler_available())
}
#[cfg(feature = "dev-context-only-utils")]
pub fn register_default_tick_for_test(&self) {
self.register_tick(&Hash::default())
self.register_tick_for_test(&Hash::default())
}
#[cfg(feature = "dev-context-only-utils")]
pub fn register_unique_tick(&self) {
self.register_tick(&Hash::new_unique())
self.register_tick_for_test(&Hash::new_unique())
}
pub fn is_complete(&self) -> bool {
@ -8008,10 +8015,14 @@ impl Bank {
}
pub fn fill_bank_with_ticks_for_tests(&self) {
self.do_fill_bank_with_ticks_for_tests(&BankWithScheduler::no_scheduler_available())
}
pub(crate) fn do_fill_bank_with_ticks_for_tests(&self, scheduler: &InstalledSchedulerRwLock) {
if self.tick_height.load(Relaxed) < self.max_tick_height {
let last_blockhash = self.last_blockhash();
while self.last_blockhash() == last_blockhash {
self.register_tick(&Hash::new_unique())
self.register_tick(&Hash::new_unique(), scheduler)
}
} else {
warn!("Bank already reached max tick height, cannot fill it with more ticks");

View File

@ -274,7 +274,7 @@ fn test_bank_new() {
assert_eq!(rent.lamports_per_byte_year, 5);
}
fn create_simple_test_bank(lamports: u64) -> Bank {
pub(crate) fn create_simple_test_bank(lamports: u64) -> Bank {
let (genesis_config, _mint_keypair) = create_genesis_config(lamports);
Bank::new_for_tests(&genesis_config)
}

View File

@ -4,7 +4,11 @@
use {
crate::bank::Bank,
log::*,
solana_sdk::transaction::SanitizedTransaction,
solana_program_runtime::timings::ExecuteTimings,
solana_sdk::{
hash::Hash,
transaction::{Result, SanitizedTransaction},
},
std::{
fmt::Debug,
ops::Deref,
@ -23,14 +27,64 @@ use {mockall::automock, qualifier_attr::qualifiers};
allow(unused_attributes, clippy::needless_lifetimes)
)]
pub trait InstalledScheduler: Send + Sync + Debug + 'static {
// Calling this is illegal as soon as wait_for_termination is called.
fn schedule_execution<'a>(
&'a self,
transaction_with_index: &'a (&'a SanitizedTransaction, usize),
);
/// Wait for a scheduler to terminate after it is notified with the given reason.
///
/// Firstly, this function blocks the current thread while waiting for the scheduler to
/// complete all of the executions for the scheduled transactions. This means the scheduler has
/// prepared the finalized `ResultWithTimings` at least internally at the time of existing from
/// this function. If no trsanction is scheduled, the result and timing will be `Ok(())` and
/// `ExecuteTimings::default()` respectively. This is done in the same way regardless of
/// `WaitReason`.
///
/// After that, the scheduler may behave differently depending on the reason, regarding the
/// final bookkeeping. Specifically, this function guaranteed to return
/// `Some(finalized_result_with_timings)` unless the reason is `PausedForRecentBlockhash`. In
/// the case of `PausedForRecentBlockhash`, the scheduler is responsible to retain the
/// finalized `ResultWithTimings` until it's `wait_for_termination()`-ed with one of the other
/// two reasons later.
#[must_use]
fn wait_for_termination(&mut self, reason: &WaitReason) -> Option<ResultWithTimings>;
}
pub type DefaultInstalledSchedulerBox = Box<dyn InstalledScheduler>;
pub type ResultWithTimings = (Result<()>, ExecuteTimings);
/// A hint from the bank about the reason the caller is waiting on its scheduler termination.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum WaitReason {
// The bank wants its scheduler to terminate after the completion of transaction execution, in
// order to freeze itself immediately thereafter. This is by far the most normal wait reason.
//
// Note that `wait_for_termination(TerminatedToFreeze)` must explicitly be done prior
// to Bank::freeze(). This can't be done inside Bank::freeze() implicitly to remain it
// infallible.
TerminatedToFreeze,
// The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
// Drop::drop() is the caller.
DroppedFromBankForks,
// The bank wants its scheduler to pause the scheduler after the completion without being
// returned to the pool to collect scheduler's internally-held `ResultWithTimings` later.
PausedForRecentBlockhash,
}
impl WaitReason {
pub fn is_paused(&self) -> bool {
// Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
// decision to be made, should we add new variants like `PausedForFooBar`...
match self {
WaitReason::PausedForRecentBlockhash => true,
WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
}
}
}
/// Very thin wrapper around Arc<Bank>
///
/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
@ -85,6 +139,14 @@ impl BankWithScheduler {
self.inner.bank.clone()
}
pub fn register_tick(&self, hash: &Hash) {
self.inner.bank.register_tick(hash, &self.inner.scheduler);
}
pub fn fill_bank_with_ticks_for_tests(&self) {
self.do_fill_bank_with_ticks_for_tests(&self.inner.scheduler);
}
pub fn has_installed_scheduler(&self) -> bool {
self.inner.scheduler.read().unwrap().is_some()
}
@ -107,11 +169,111 @@ impl BankWithScheduler {
}
}
// take needless &mut only to communicate its semantic mutability to humans...
#[cfg(feature = "dev-context-only-utils")]
pub fn drop_scheduler(&mut self) {
self.inner.drop_scheduler();
}
pub(crate) fn wait_for_paused_scheduler(bank: &Bank, scheduler: &InstalledSchedulerRwLock) {
let maybe_result_with_timings = BankWithSchedulerInner::wait_for_scheduler_termination(
bank,
scheduler,
WaitReason::PausedForRecentBlockhash,
);
assert!(
maybe_result_with_timings.is_none(),
"Premature result was returned from scheduler after paused"
);
}
#[must_use]
pub fn wait_for_completed_scheduler(&self) -> Option<ResultWithTimings> {
BankWithSchedulerInner::wait_for_scheduler_termination(
&self.inner.bank,
&self.inner.scheduler,
WaitReason::TerminatedToFreeze,
)
}
pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(None)
}
}
impl BankWithSchedulerInner {
#[must_use]
fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
Self::wait_for_scheduler_termination(
&self.bank,
&self.scheduler,
WaitReason::DroppedFromBankForks,
)
}
#[must_use]
fn wait_for_scheduler_termination(
bank: &Bank,
scheduler: &InstalledSchedulerRwLock,
reason: WaitReason,
) -> Option<ResultWithTimings> {
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): started...",
bank.slot(),
reason,
);
let mut scheduler = scheduler.write().unwrap();
let result_with_timings = if scheduler.is_some() {
let result_with_timings = scheduler
.as_mut()
.and_then(|scheduler| scheduler.wait_for_termination(&reason));
if !reason.is_paused() {
drop(scheduler.take().expect("scheduler after waiting"));
}
result_with_timings
} else {
None
};
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): finished with: {:?}...",
bank.slot(),
reason,
result_with_timings.as_ref().map(|(result, _)| result),
);
result_with_timings
}
fn drop_scheduler(&self) {
if std::thread::panicking() {
error!(
"BankWithSchedulerInner::drop_scheduler(): slot: {} skipping due to already panicking...",
self.bank.slot(),
);
return;
}
// There's no guarantee ResultWithTimings is available or not at all when being dropped.
if let Some(Err(err)) = self
.wait_for_completed_scheduler_from_drop()
.map(|(result, _timings)| result)
{
warn!(
"BankWithSchedulerInner::drop_scheduler(): slot: {} discarding error from scheduler: {:?}",
self.bank.slot(),
err,
);
}
}
}
impl Drop for BankWithSchedulerInner {
fn drop(&mut self) {
self.drop_scheduler();
}
}
impl Deref for BankWithScheduler {
type Target = Arc<Bank>;
@ -119,3 +281,149 @@ impl Deref for BankWithScheduler {
&self.inner.bank
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
bank::test_utils::goto_end_of_slot_with_scheduler,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
assert_matches::assert_matches,
mockall::Sequence,
solana_sdk::system_transaction,
};
fn setup_mocked_scheduler_with_extra(
wait_reasons: impl Iterator<Item = WaitReason>,
f: Option<impl Fn(&mut MockInstalledScheduler)>,
) -> DefaultInstalledSchedulerBox {
let mut mock = MockInstalledScheduler::new();
let mut seq = Sequence::new();
for wait_reason in wait_reasons {
mock.expect_wait_for_termination()
.with(mockall::predicate::eq(wait_reason))
.times(1)
.in_sequence(&mut seq)
.returning(move |_| {
if wait_reason.is_paused() {
None
} else {
Some((Ok(()), ExecuteTimings::default()))
}
});
}
if let Some(f) = f {
f(&mut mock);
}
Box::new(mock)
}
fn setup_mocked_scheduler(
wait_reasons: impl Iterator<Item = WaitReason>,
) -> DefaultInstalledSchedulerBox {
setup_mocked_scheduler_with_extra(
wait_reasons,
None::<fn(&mut MockInstalledScheduler) -> ()>,
)
}
#[test]
fn test_scheduler_normal_termination() {
solana_logger::setup();
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank,
Some(setup_mocked_scheduler(
[WaitReason::TerminatedToFreeze].into_iter(),
)),
);
assert!(bank.has_installed_scheduler());
assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
// Repeating to call wait_for_completed_scheduler() is okay with no ResultWithTimings being
// returned.
assert!(!bank.has_installed_scheduler());
assert_matches!(bank.wait_for_completed_scheduler(), None);
}
#[test]
fn test_no_scheduler_termination() {
solana_logger::setup();
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new_without_scheduler(bank);
// Calling wait_for_completed_scheduler() is noop, when no scheduler is installed.
assert!(!bank.has_installed_scheduler());
assert_matches!(bank.wait_for_completed_scheduler(), None);
}
#[test]
fn test_scheduler_termination_from_drop() {
solana_logger::setup();
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank,
Some(setup_mocked_scheduler(
[WaitReason::DroppedFromBankForks].into_iter(),
)),
);
drop(bank);
}
#[test]
fn test_scheduler_pause() {
solana_logger::setup();
let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
let bank = BankWithScheduler::new(
bank,
Some(setup_mocked_scheduler(
[
WaitReason::PausedForRecentBlockhash,
WaitReason::TerminatedToFreeze,
]
.into_iter(),
)),
);
goto_end_of_slot_with_scheduler(&bank);
assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
}
#[test]
fn test_schedule_executions() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let tx0 = SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mocked_scheduler = setup_mocked_scheduler_with_extra(
[WaitReason::DroppedFromBankForks].into_iter(),
Some(|mocked: &mut MockInstalledScheduler| {
mocked
.expect_schedule_execution()
.times(1)
.returning(|(_, _)| ());
}),
);
let bank = BankWithScheduler::new(bank, Some(mocked_scheduler));
bank.schedule_transaction_executions([(&tx0, &0)].into_iter());
}
}