Introduce InstalledSchedulerPool trait (#33934)

* Introduce InstalledSchedulerPool

* Use type alias

* Remove log_prefix for now...

* Simplify return_to_pool()

* Simplify InstalledScheduler's context methods

* Reorder trait methods semantically

* Simplify Arc<Bank> handling
This commit is contained in:
Ryo Onodera 2023-11-03 16:02:12 +09:00 committed by GitHub
parent 1c00d5d81a
commit a4a66026e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 242 additions and 9 deletions

34
Cargo.lock generated
View File

@ -167,6 +167,20 @@ version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "aquamarine"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df752953c49ce90719c7bf1fc587bc8227aed04732ea0c0f85e5397d7fdbd1a1"
dependencies = [
"include_dir",
"itertools",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "arc-swap"
version = "1.5.0"
@ -2611,6 +2625,25 @@ dependencies = [
"version_check",
]
[[package]]
name = "include_dir"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18762faeff7122e89e0857b02f7ce6fcc0d101d5e9ad2ad7846cc01d61b7f19e"
dependencies = [
"include_dir_macros",
]
[[package]]
name = "include_dir_macros"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f"
dependencies = [
"proc-macro2",
"quote",
]
[[package]]
name = "index_list"
version = "0.2.7"
@ -6943,6 +6976,7 @@ dependencies = [
name = "solana-runtime"
version = "1.18.0"
dependencies = [
"aquamarine",
"arrayref",
"assert_matches",
"base64 0.21.5",

View File

@ -132,6 +132,7 @@ edition = "2021"
[workspace.dependencies]
Inflector = "0.11.4"
aquamarine = "0.3.2"
aes-gcm-siv = "0.10.3"
ahash = "0.8.6"
anyhow = "1.0.75"

View File

@ -1945,7 +1945,7 @@ pub mod tests {
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::{MockInstalledScheduler, WaitReason},
installed_scheduler_pool::{MockInstalledScheduler, SchedulingContext, WaitReason},
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
@ -4527,11 +4527,17 @@ pub mod tests {
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let context = SchedulingContext::new(bank.clone());
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let mut mocked_scheduler = MockInstalledScheduler::new();
let mut seq = mockall::Sequence::new();
mocked_scheduler
.expect_context()
.times(1)
.in_sequence(&mut seq)
.return_const(context);
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
@ -4542,6 +4548,11 @@ pub mod tests {
.times(1)
.in_sequence(&mut seq)
.returning(|_| None);
mocked_scheduler
.expect_return_to_pool()
.times(1)
.in_sequence(&mut seq)
.returning(|| ());
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));
let batch = bank.prepare_sanitized_batch(&txs);

View File

@ -156,6 +156,20 @@ version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "aquamarine"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df752953c49ce90719c7bf1fc587bc8227aed04732ea0c0f85e5397d7fdbd1a1"
dependencies = [
"include_dir",
"itertools",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "arc-swap"
version = "1.5.0"
@ -2245,6 +2259,25 @@ dependencies = [
"version_check",
]
[[package]]
name = "include_dir"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18762faeff7122e89e0857b02f7ce6fcc0d101d5e9ad2ad7846cc01d61b7f19e"
dependencies = [
"include_dir_macros",
]
[[package]]
name = "include_dir_macros"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f"
dependencies = [
"proc-macro2",
"quote",
]
[[package]]
name = "index_list"
version = "0.2.7"
@ -5648,6 +5681,7 @@ dependencies = [
name = "solana-runtime"
version = "1.18.0"
dependencies = [
"aquamarine",
"arrayref",
"base64 0.21.5",
"bincode",

View File

@ -10,6 +10,7 @@ license = { workspace = true }
edition = { workspace = true }
[dependencies]
aquamarine = { workspace = true }
arrayref = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }

View File

@ -4,7 +4,9 @@ use {
crate::{
accounts_background_service::{AbsRequestSender, SnapshotRequest, SnapshotRequestKind},
bank::{epoch_accounts_hash_utils, Bank, SquashTiming},
installed_scheduler_pool::BankWithScheduler,
installed_scheduler_pool::{
BankWithScheduler, InstalledSchedulerPoolArc, SchedulingContext,
},
snapshot_config::SnapshotConfig,
},
log::*,
@ -72,6 +74,7 @@ pub struct BankForks {
last_accounts_hash_slot: Slot,
in_vote_only_mode: Arc<AtomicBool>,
highest_slot_at_startup: Slot,
scheduler_pool: Option<InstalledSchedulerPoolArc>,
}
impl Index<u64> for BankForks {
@ -203,6 +206,7 @@ impl BankForks {
last_accounts_hash_slot: root,
in_vote_only_mode: Arc::new(AtomicBool::new(false)),
highest_slot_at_startup: 0,
scheduler_pool: None,
}));
for bank in bank_forks.read().unwrap().banks.values() {
@ -215,11 +219,26 @@ impl BankForks {
bank_forks
}
pub fn install_scheduler_pool(&mut self, pool: InstalledSchedulerPoolArc) {
info!("Installed new scheduler_pool into bank_forks: {:?}", pool);
assert!(
self.scheduler_pool.replace(pool).is_none(),
"Reinstalling scheduler pool isn't supported"
);
}
pub fn insert(&mut self, mut bank: Bank) -> BankWithScheduler {
bank.check_program_modification_slot =
self.root.load(Ordering::Relaxed) < self.highest_slot_at_startup;
let bank = BankWithScheduler::new_without_scheduler(Arc::new(bank));
let bank = Arc::new(bank);
let bank = if let Some(scheduler_pool) = &self.scheduler_pool {
let context = SchedulingContext::new(bank.clone());
let scheduler = scheduler_pool.take_scheduler(context);
BankWithScheduler::new(bank, Some(scheduler))
} else {
BankWithScheduler::new_without_scheduler(bank)
};
let prev = self.banks.insert(bank.slot(), bank.clone_with_scheduler());
assert!(prev.is_none());
let slot = bank.slot();

View File

@ -1,5 +1,24 @@
//! Currently, there are only two things: minimal InstalledScheduler trait and an auxiliary type
//! called BankWithScheduler.. This file will be populated by later PRs to align with the filename.
//! Transaction processing glue code, mainly consisting of Object-safe traits
//!
//! [InstalledSchedulerPool] lends one of pooled [InstalledScheduler]s as wrapped in
//! [BankWithScheduler], which can be used by `ReplayStage` and `BankingStage` for transaction
//! execution. After use, the scheduler will be returned to the pool.
//!
//! [InstalledScheduler] can be fed with [SanitizedTransaction]s. Then, it schedules those
//! executions and commits those results into the associated _bank_.
//!
//! It's generally assumed that each [InstalledScheduler] is backed by multiple threads for
//! parallel transaction processing and there are multiple independent schedulers inside a single
//! instance of [InstalledSchedulerPool].
//!
//! Dynamic dispatch was inevitable due to the desire to piggyback on
//! [BankForks](crate::bank_forks::BankForks)'s pruning for scheduler lifecycle management as the
//! common place both for `ReplayStage` and `BankingStage` and the resultant need of invoking
//! actual implementations provided by the dependent crate (`solana-unified-scheduler-pool`, which
//! in turn depends on `solana-ledger`, which in turn depends on `solana-runtime`), avoiding a
//! cyclic dependency.
//!
//! See [InstalledScheduler] for visualized interaction.
use {
crate::bank::Bank,
@ -7,6 +26,7 @@ use {
solana_program_runtime::timings::ExecuteTimings,
solana_sdk::{
hash::Hash,
slot_history::Slot,
transaction::{Result, SanitizedTransaction},
},
std::{
@ -18,6 +38,57 @@ use {
#[cfg(feature = "dev-context-only-utils")]
use {mockall::automock, qualifier_attr::qualifiers};
pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> DefaultInstalledSchedulerBox;
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Schedules, executes, and commits transactions under encapsulated implementation
///
/// The following chart illustrates the ownership/reference interaction between inter-dependent
/// objects across crates:
///
/// ```mermaid
/// graph TD
/// Bank["Arc#lt;Bank#gt;"]
///
/// subgraph solana-runtime
/// BankForks;
/// BankWithScheduler;
/// Bank;
/// LoadExecuteAndCommitTransactions(["load_execute_and_commit_transactions()"]);
/// SchedulingContext;
/// InstalledSchedulerPool{{InstalledSchedulerPool}};
/// InstalledScheduler{{InstalledScheduler}};
/// end
///
/// subgraph solana-unified-scheduler-pool
/// SchedulerPool;
/// PooledScheduler;
/// ScheduleExecution(["schedule_execution()"]);
/// end
///
/// subgraph solana-ledger
/// ExecuteBatch(["execute_batch()"]);
/// end
///
/// ScheduleExecution -. calls .-> ExecuteBatch;
/// BankWithScheduler -. dyn-calls .-> ScheduleExecution;
/// ExecuteBatch -. calls .-> LoadExecuteAndCommitTransactions;
/// linkStyle 0,1,2 stroke:gray,color:gray;
///
/// BankForks -- owns --> BankWithScheduler;
/// BankForks -- owns --> InstalledSchedulerPool;
/// BankWithScheduler -- refs --> Bank;
/// BankWithScheduler -- owns --> InstalledScheduler;
/// SchedulingContext -- refs --> Bank;
/// InstalledScheduler -- owns --> SchedulingContext;
///
/// SchedulerPool -- owns --> PooledScheduler;
/// SchedulerPool -. impls .-> InstalledSchedulerPool;
/// PooledScheduler -. impls .-> InstalledScheduler;
/// PooledScheduler -- refs --> SchedulerPool;
/// ```
#[cfg_attr(feature = "dev-context-only-utils", automock)]
// suppress false clippy complaints arising from mockall-derive:
// warning: `#[must_use]` has no effect when applied to a struct field
@ -27,6 +98,9 @@ use {mockall::automock, qualifier_attr::qualifiers};
allow(unused_attributes, clippy::needless_lifetimes)
)]
pub trait InstalledScheduler: Send + Sync + Debug + 'static {
fn id(&self) -> SchedulerId;
fn context(&self) -> &SchedulingContext;
// Calling this is illegal as soon as wait_for_termination is called.
fn schedule_execution<'a>(
&'a self,
@ -50,10 +124,45 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static {
/// two reasons later.
#[must_use]
fn wait_for_termination(&mut self, reason: &WaitReason) -> Option<ResultWithTimings>;
fn return_to_pool(self: Box<Self>);
}
pub type DefaultInstalledSchedulerBox = Box<dyn InstalledScheduler>;
pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
pub type SchedulerId = u64;
/// A small context to propagate a bank and its scheduling mode to the scheduler subsystem.
///
/// Note that this isn't called `SchedulerContext` because the contexts aren't associated with
/// schedulers one by one. A scheduler will use many SchedulingContexts during its lifetime.
/// "Scheduling" part of the context name refers to an abstract slice of time to schedule and
/// execute all transactions for a given bank for block verification or production. A context is
/// expected to be used by a particular scheduler only for that duration of the time and to be
/// disposed by the scheduler. Then, the scheduler may work on different banks with new
/// `SchedulingContext`s.
#[derive(Clone, Debug)]
pub struct SchedulingContext {
// mode: SchedulingMode, // this will be added later.
bank: Arc<Bank>,
}
impl SchedulingContext {
pub fn new(bank: Arc<Bank>) -> Self {
Self { bank }
}
pub fn bank(&self) -> &Arc<Bank> {
&self.bank
}
pub fn slot(&self) -> Slot {
self.bank().slot()
}
}
pub type ResultWithTimings = (Result<()>, ExecuteTimings);
/// A hint from the bank about the reason the caller is waiting on its scheduler termination.
@ -117,6 +226,13 @@ pub type InstalledSchedulerRwLock = RwLock<Option<DefaultInstalledSchedulerBox>>
impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<DefaultInstalledSchedulerBox>) -> Self {
if let Some(bank_in_context) = scheduler
.as_ref()
.map(|scheduler| scheduler.context().bank())
{
assert!(Arc::ptr_eq(&bank, bank_in_context));
}
Self {
inner: Arc::new(BankWithSchedulerInner {
bank,
@ -229,7 +345,8 @@ impl BankWithSchedulerInner {
.as_mut()
.and_then(|scheduler| scheduler.wait_for_termination(&reason));
if !reason.is_paused() {
drop(scheduler.take().expect("scheduler after waiting"));
let scheduler = scheduler.take().expect("scheduler after waiting");
scheduler.return_to_pool();
}
result_with_timings
} else {
@ -296,12 +413,18 @@ mod tests {
};
fn setup_mocked_scheduler_with_extra(
bank: Arc<Bank>,
wait_reasons: impl Iterator<Item = WaitReason>,
f: Option<impl Fn(&mut MockInstalledScheduler)>,
) -> DefaultInstalledSchedulerBox {
let mut mock = MockInstalledScheduler::new();
let mut seq = Sequence::new();
mock.expect_context()
.times(1)
.in_sequence(&mut seq)
.return_const(SchedulingContext::new(bank));
for wait_reason in wait_reasons {
mock.expect_wait_for_termination()
.with(mockall::predicate::eq(wait_reason))
@ -316,6 +439,10 @@ mod tests {
});
}
mock.expect_return_to_pool()
.times(1)
.in_sequence(&mut seq)
.returning(|| ());
if let Some(f) = f {
f(&mut mock);
}
@ -324,9 +451,11 @@ mod tests {
}
fn setup_mocked_scheduler(
bank: Arc<Bank>,
wait_reasons: impl Iterator<Item = WaitReason>,
) -> DefaultInstalledSchedulerBox {
setup_mocked_scheduler_with_extra(
bank,
wait_reasons,
None::<fn(&mut MockInstalledScheduler) -> ()>,
)
@ -338,8 +467,9 @@ mod tests {
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank,
bank.clone(),
Some(setup_mocked_scheduler(
bank,
[WaitReason::TerminatedToFreeze].into_iter(),
)),
);
@ -370,8 +500,9 @@ mod tests {
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank,
bank.clone(),
Some(setup_mocked_scheduler(
bank,
[WaitReason::DroppedFromBankForks].into_iter(),
)),
);
@ -384,8 +515,9 @@ mod tests {
let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
let bank = BankWithScheduler::new(
bank,
bank.clone(),
Some(setup_mocked_scheduler(
bank,
[
WaitReason::PausedForRecentBlockhash,
WaitReason::TerminatedToFreeze,
@ -414,6 +546,7 @@ mod tests {
));
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mocked_scheduler = setup_mocked_scheduler_with_extra(
bank.clone(),
[WaitReason::DroppedFromBankForks].into_iter(),
Some(|mocked: &mut MockInstalledScheduler| {
mocked