Finish unified scheduler plumbing with min impl (#34300)

* Finalize unified scheduler plumbing with min impl

* Fix comment

* Rename leftover type name...

* Make logging text less ambiguous

* Make PhantomData simplyer without already used S

* Make TaskHandler stateless again

* Introduce HandlerContext to simplify TaskHandler

* Add comment for coexistence of Pool::{new,new_dyn}

* Fix grammar

* Remove confusing const for upcoming changes

* Demote InstalledScheduler::context() into dcou

* Delay drop of context up to return_to_pool()-ing

* Revert "Demote InstalledScheduler::context() into dcou"

This reverts commit 049a126c905df0ba8ad975c5cb1007ae90a21050.

* Revert "Delay drop of context up to return_to_pool()-ing"

This reverts commit 60b1bd2511a714690b0b2331e49bc3d0c72e3475.

* Make context handling really type-safe

* Update comment

* Fix grammar...

* Refine type aliases for boxed traits

* Swap the tuple order for readability & semantics

* Simplify PooledScheduler::result_with_timings type

* Restore .in_sequence()

* Use where for aesthetics

* Simplify if...

* Fix typo...

* Polish ::schedule_execution() a bit

* Fix rebase conflicts..

* Make test more readable

* Fix test failures after rebase...
This commit is contained in:
Ryo Onodera 2023-12-19 09:50:41 +09:00 committed by GitHub
parent 4a8d27d921
commit d2b5afc410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1031 additions and 85 deletions

20
Cargo.lock generated
View File

@ -5972,6 +5972,7 @@ dependencies = [
"solana-tpu-client",
"solana-transaction-status",
"solana-turbine",
"solana-unified-scheduler-pool",
"solana-version",
"solana-vote",
"solana-vote-program",
@ -6431,6 +6432,7 @@ dependencies = [
"solana-storage-bigtable",
"solana-streamer",
"solana-transaction-status",
"solana-unified-scheduler-pool",
"solana-version",
"solana-vote-program",
"solana_rbpf",
@ -7544,6 +7546,24 @@ dependencies = [
"tokio",
]
[[package]]
name = "solana-unified-scheduler-logic"
version = "1.18.0"
[[package]]
name = "solana-unified-scheduler-pool"
version = "1.18.0"
dependencies = [
"assert_matches",
"solana-ledger",
"solana-logger",
"solana-program-runtime",
"solana-runtime",
"solana-sdk",
"solana-unified-scheduler-logic",
"solana-vote",
]
[[package]]
name = "solana-upload-perf"
version = "1.18.0"

View File

@ -108,6 +108,8 @@ members = [
"transaction-status",
"turbine",
"udp-client",
"unified-scheduler-logic",
"unified-scheduler-pool",
"upload-perf",
"validator",
"version",
@ -357,6 +359,8 @@ solana-pubsub-client = { path = "pubsub-client", version = "=1.18.0" }
solana-quic-client = { path = "quic-client", version = "=1.18.0" }
solana-rayon-threadlimit = { path = "rayon-threadlimit", version = "=1.18.0" }
solana-remote-wallet = { path = "remote-wallet", version = "=1.18.0", default-features = false }
solana-unified-scheduler-logic = { path = "unified-scheduler-logic", version = "=1.18.0" }
solana-unified-scheduler-pool = { path = "unified-scheduler-pool", version = "=1.18.0" }
solana-rpc = { path = "rpc", version = "=1.18.0" }
solana-rpc-client = { path = "rpc-client", version = "=1.18.0", default-features = false }
solana-rpc-client-api = { path = "rpc-client-api", version = "=1.18.0" }

View File

@ -39,4 +39,5 @@ $solana_ledger_tool create-snapshot --ledger config/ledger "$snapshot_slot" conf
cp config/ledger/genesis.tar.bz2 config/snapshot-ledger
$solana_ledger_tool copy --ledger config/ledger \
--target-db config/snapshot-ledger --starting-slot "$snapshot_slot" --ending-slot "$latest_slot"
$solana_ledger_tool verify --ledger config/snapshot-ledger
$solana_ledger_tool verify --ledger config/snapshot-ledger --block-verification-method blockstore-processor
$solana_ledger_tool verify --ledger config/snapshot-ledger --block-verification-method unified-scheduler

View File

@ -69,6 +69,7 @@ solana-streamer = { workspace = true }
solana-tpu-client = { workspace = true }
solana-transaction-status = { workspace = true }
solana-turbine = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }

View File

@ -118,6 +118,7 @@ use {
solana_send_transaction_service::send_transaction_service,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_turbine::{self, broadcast_stage::BroadcastStageType},
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::vote_state,
solana_wen_restart::wen_restart::wait_for_wen_restart,
std::{
@ -144,6 +145,7 @@ const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
pub enum BlockVerificationMethod {
#[default]
BlockstoreProcessor,
UnifiedScheduler,
}
impl BlockVerificationMethod {
@ -813,6 +815,24 @@ impl Validator {
// (by both replay stage and banking stage)
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
match &config.block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
}
BlockVerificationMethod::UnifiedScheduler => {
let scheduler_pool = DefaultSchedulerPool::new_dyn(
config.runtime_config.log_messages_bytes_limit,
transaction_status_sender.clone(),
Some(replay_vote_sender.clone()),
prioritization_fee_cache.clone(),
);
bank_forks
.write()
.unwrap()
.install_scheduler_pool(scheduler_pool);
}
}
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let entry_notification_sender = entry_notifier_service
.as_ref()

View File

@ -45,6 +45,7 @@ solana-stake-program = { workspace = true }
solana-storage-bigtable = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-status = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }
solana-vote-program = { workspace = true }
solana_rbpf = { workspace = true, features = ["debugger"] }

View File

@ -30,6 +30,7 @@ use {
PrunedBanksRequestHandler, SnapshotRequestHandler,
},
bank_forks::BankForks,
prioritization_fee_cache::PrioritizationFeeCache,
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_utils::{
@ -42,6 +43,7 @@ use {
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
solana_unified_scheduler_pool::DefaultSchedulerPool,
std::{
path::{Path, PathBuf},
process::exit,
@ -305,6 +307,25 @@ pub fn load_and_process_ledger(
"Using: block-verification-method: {}",
block_verification_method,
);
match block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
}
BlockVerificationMethod::UnifiedScheduler => {
let no_transaction_status_sender = None;
let no_replay_vote_sender = None;
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
bank_forks
.write()
.unwrap()
.install_scheduler_pool(DefaultSchedulerPool::new_dyn(
process_options.runtime_config.log_messages_bytes_limit,
no_transaction_status_sender,
no_replay_vote_sender,
ignored_prioritization_fee_cache,
));
}
}
let node_id = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(

View File

@ -74,7 +74,7 @@ use {
thiserror::Error,
};
struct TransactionBatchWithIndexes<'a, 'b> {
pub struct TransactionBatchWithIndexes<'a, 'b> {
pub batch: TransactionBatch<'a, 'b>,
pub transaction_indexes: Vec<usize>,
}
@ -134,7 +134,7 @@ fn get_first_error(
first_err
}
fn execute_batch(
pub fn execute_batch(
batch: &TransactionBatchWithIndexes,
bank: &Arc<Bank>,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -1832,7 +1832,7 @@ pub struct TransactionStatusBatch {
pub transaction_indexes: Vec<usize>,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct TransactionStatusSender {
pub sender: Sender<TransactionStatusMessage>,
}
@ -1947,7 +1947,9 @@ pub mod tests {
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::{MockInstalledScheduler, SchedulingContext, WaitReason},
installed_scheduler_pool::{
MockInstalledScheduler, MockUninstalledScheduler, SchedulingContext,
},
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
@ -4545,11 +4547,12 @@ pub mod tests {
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let mut mocked_scheduler = MockInstalledScheduler::new();
let mut seq = mockall::Sequence::new();
let seq = Arc::new(Mutex::new(mockall::Sequence::new()));
let seq_cloned = seq.clone();
mocked_scheduler
.expect_context()
.times(1)
.in_sequence(&mut seq)
.in_sequence(&mut seq.lock().unwrap())
.return_const(context);
mocked_scheduler
.expect_schedule_execution()
@ -4557,15 +4560,21 @@ pub mod tests {
.returning(|_| ());
mocked_scheduler
.expect_wait_for_termination()
.with(mockall::predicate::eq(WaitReason::DroppedFromBankForks))
.with(mockall::predicate::eq(true))
.times(1)
.in_sequence(&mut seq)
.returning(|_| None);
mocked_scheduler
.expect_return_to_pool()
.times(1)
.in_sequence(&mut seq)
.returning(|| ());
.in_sequence(&mut seq.lock().unwrap())
.returning(move |_| {
let mut mocked_uninstalled_scheduler = MockUninstalledScheduler::new();
mocked_uninstalled_scheduler
.expect_return_to_pool()
.times(1)
.in_sequence(&mut seq_cloned.lock().unwrap())
.returning(|| ());
(
(Ok(()), ExecuteTimings::default()),
Box::new(mocked_uninstalled_scheduler),
)
});
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));
let batch = bank.prepare_sanitized_batch(&txs);

View File

@ -4,6 +4,7 @@ use {
crossbeam_channel::{unbounded, Receiver},
gag::BufferRedirect,
log::*,
rand::seq::IteratorRandom,
serial_test::serial,
solana_accounts_db::{
accounts_db::create_accounts_run_and_snapshot_dirs, hardened_unpack::open_genesis_config,
@ -15,7 +16,7 @@ use {
},
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
replay_stage::DUPLICATE_THRESHOLD,
validator::ValidatorConfig,
validator::{BlockVerificationMethod, ValidatorConfig},
},
solana_download_utils::download_snapshot_archive,
solana_entry::entry::create_ticks,
@ -5456,6 +5457,44 @@ fn test_duplicate_shreds_switch_failure() {
);
}
#[test]
#[serial]
fn test_randomly_mixed_block_verification_methods_between_bootstrap_and_not() {
// tailored logging just to see two block verification methods are working correctly
solana_logger::setup_with_default(
"solana_metrics::metrics=warn,\
solana_core=warn,\
solana_runtime::installed_scheduler_pool=trace,\
solana_ledger::blockstore_processor=debug,\
info",
);
let num_nodes = 2;
let mut config = ClusterConfig::new_with_equal_stakes(
num_nodes,
DEFAULT_CLUSTER_LAMPORTS,
DEFAULT_NODE_STAKE,
);
// Randomly switch to use unified scheduler
config
.validator_configs
.iter_mut()
.choose(&mut rand::thread_rng())
.unwrap()
.block_verification_method = BlockVerificationMethod::UnifiedScheduler;
let local = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&local.connection_cache,
);
}
/// Forks previous marked invalid should be marked as such in fork choice on restart
#[test]
#[serial]

View File

@ -5041,6 +5041,7 @@ dependencies = [
"solana-tpu-client",
"solana-transaction-status",
"solana-turbine",
"solana-unified-scheduler-pool",
"solana-version",
"solana-vote",
"solana-vote-program",
@ -6545,6 +6546,22 @@ dependencies = [
"tokio",
]
[[package]]
name = "solana-unified-scheduler-logic"
version = "1.18.0"
[[package]]
name = "solana-unified-scheduler-pool"
version = "1.18.0"
dependencies = [
"solana-ledger",
"solana-program-runtime",
"solana-runtime",
"solana-sdk",
"solana-unified-scheduler-logic",
"solana-vote",
]
[[package]]
name = "solana-validator"
version = "1.18.0"

View File

@ -4290,7 +4290,7 @@ impl Bank {
}
/// Prepare a transaction batch from a single transaction without locking accounts
pub(crate) fn prepare_unlocked_batch_from_single_tx<'a>(
pub fn prepare_unlocked_batch_from_single_tx<'a>(
&'a self,
transaction: &'a SanitizedTransaction,
) -> TransactionBatch<'_, '_> {

View File

@ -39,7 +39,7 @@ use {
use {mockall::automock, qualifier_attr::qualifiers};
pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> DefaultInstalledSchedulerBox;
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox;
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@ -107,28 +107,36 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static {
transaction_with_index: &'a (&'a SanitizedTransaction, usize),
);
/// Wait for a scheduler to terminate after it is notified with the given reason.
/// Wait for a scheduler to terminate after processing.
///
/// Firstly, this function blocks the current thread while waiting for the scheduler to
/// complete all of the executions for the scheduled transactions. This means the scheduler has
/// prepared the finalized `ResultWithTimings` at least internally at the time of existing from
/// this function. If no trsanction is scheduled, the result and timing will be `Ok(())` and
/// `ExecuteTimings::default()` respectively. This is done in the same way regardless of
/// `WaitReason`.
/// This function blocks the current thread while waiting for the scheduler to complete all of
/// the executions for the scheduled transactions and to return the finalized
/// `ResultWithTimings`. Along with the result, this function also makes the scheduler itself
/// uninstalled from the bank by transforming the consumed self.
///
/// After that, the scheduler may behave differently depending on the reason, regarding the
/// final bookkeeping. Specifically, this function guaranteed to return
/// `Some(finalized_result_with_timings)` unless the reason is `PausedForRecentBlockhash`. In
/// the case of `PausedForRecentBlockhash`, the scheduler is responsible to retain the
/// finalized `ResultWithTimings` until it's `wait_for_termination()`-ed with one of the other
/// two reasons later.
#[must_use]
fn wait_for_termination(&mut self, reason: &WaitReason) -> Option<ResultWithTimings>;
/// If no transaction is scheduled, the result and timing will be `Ok(())` and
/// `ExecuteTimings::default()` respectively.
fn wait_for_termination(
self: Box<Self>,
is_dropped: bool,
) -> (ResultWithTimings, UninstalledSchedulerBox);
/// Pause a scheduler after processing to update bank's recent blockhash.
///
/// This function blocks the current thread like wait_for_termination(). However, the scheduler
/// won't be consumed. This means the scheduler is responsible to retain the finalized
/// `ResultWithTimings` internally until it's `wait_for_termination()`-ed to collect the result
/// later.
fn pause_for_recent_blockhash(&mut self);
}
#[cfg_attr(feature = "dev-context-only-utils", automock)]
pub trait UninstalledScheduler: Send + Sync + Debug + 'static {
fn return_to_pool(self: Box<Self>);
}
pub type DefaultInstalledSchedulerBox = Box<dyn InstalledScheduler>;
pub type InstalledSchedulerBox = Box<dyn InstalledScheduler>;
pub type UninstalledSchedulerBox = Box<dyn UninstalledScheduler>;
pub type InstalledSchedulerPoolArc = Arc<dyn InstalledSchedulerPool>;
@ -165,9 +173,9 @@ impl SchedulingContext {
pub type ResultWithTimings = (Result<()>, ExecuteTimings);
/// A hint from the bank about the reason the caller is waiting on its scheduler termination.
/// A hint from the bank about the reason the caller is waiting on its scheduler.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum WaitReason {
enum WaitReason {
// The bank wants its scheduler to terminate after the completion of transaction execution, in
// order to freeze itself immediately thereafter. This is by far the most normal wait reason.
//
@ -178,8 +186,9 @@ pub enum WaitReason {
// The bank wants its scheduler to terminate just like `TerminatedToFreeze` and indicate that
// Drop::drop() is the caller.
DroppedFromBankForks,
// The bank wants its scheduler to pause the scheduler after the completion without being
// returned to the pool to collect scheduler's internally-held `ResultWithTimings` later.
// The bank wants its scheduler to pause after the completion without being returned to the
// pool. This is to update bank's recent blockhash and to collect scheduler's internally-held
// `ResultWithTimings` later.
PausedForRecentBlockhash,
}
@ -192,6 +201,15 @@ impl WaitReason {
WaitReason::TerminatedToFreeze | WaitReason::DroppedFromBankForks => false,
}
}
pub fn is_dropped(&self) -> bool {
// Exhaustive `match` is preferred here than `matches!()` to trigger an explicit
// decision to be made, should we add new variants like `PausedForFooBar`...
match self {
WaitReason::DroppedFromBankForks => true,
WaitReason::TerminatedToFreeze | WaitReason::PausedForRecentBlockhash => false,
}
}
}
/// Very thin wrapper around Arc<Bank>
@ -221,11 +239,11 @@ pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<Option<DefaultInstalledSchedulerBox>>;
pub type InstalledSchedulerRwLock = RwLock<Option<InstalledSchedulerBox>>;
impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<DefaultInstalledSchedulerBox>) -> Self {
pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
if let Some(bank_in_context) = scheduler
.as_ref()
.map(|scheduler| scheduler.context().bank())
@ -341,18 +359,18 @@ impl BankWithSchedulerInner {
);
let mut scheduler = scheduler.write().unwrap();
let result_with_timings = if scheduler.is_some() {
let result_with_timings = scheduler
.as_mut()
.and_then(|scheduler| scheduler.wait_for_termination(&reason));
if !reason.is_paused() {
let scheduler = scheduler.take().expect("scheduler after waiting");
scheduler.return_to_pool();
}
result_with_timings
} else {
None
};
let result_with_timings =
if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) {
scheduler.pause_for_recent_blockhash();
None
} else if let Some(scheduler) = scheduler.take() {
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(reason.is_dropped());
uninstalled_scheduler.return_to_pool();
Some(result_with_timings)
} else {
None
};
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): finished with: {:?}...",
bank.slot(),
@ -411,39 +429,42 @@ mod tests {
assert_matches::assert_matches,
mockall::Sequence,
solana_sdk::system_transaction,
std::sync::Mutex,
};
fn setup_mocked_scheduler_with_extra(
bank: Arc<Bank>,
wait_reasons: impl Iterator<Item = WaitReason>,
is_dropped_flags: impl Iterator<Item = bool>,
f: Option<impl Fn(&mut MockInstalledScheduler)>,
) -> DefaultInstalledSchedulerBox {
) -> InstalledSchedulerBox {
let mut mock = MockInstalledScheduler::new();
let mut seq = Sequence::new();
let seq = Arc::new(Mutex::new(Sequence::new()));
mock.expect_context()
.times(1)
.in_sequence(&mut seq)
.in_sequence(&mut seq.lock().unwrap())
.return_const(SchedulingContext::new(bank));
for wait_reason in wait_reasons {
for wait_reason in is_dropped_flags {
let seq_cloned = seq.clone();
mock.expect_wait_for_termination()
.with(mockall::predicate::eq(wait_reason))
.times(1)
.in_sequence(&mut seq)
.in_sequence(&mut seq.lock().unwrap())
.returning(move |_| {
if wait_reason.is_paused() {
None
} else {
Some((Ok(()), ExecuteTimings::default()))
}
let mut mock_uninstalled = MockUninstalledScheduler::new();
mock_uninstalled
.expect_return_to_pool()
.times(1)
.in_sequence(&mut seq_cloned.lock().unwrap())
.returning(|| ());
(
(Ok(()), ExecuteTimings::default()),
Box::new(mock_uninstalled),
)
});
}
mock.expect_return_to_pool()
.times(1)
.in_sequence(&mut seq)
.returning(|| ());
if let Some(f) = f {
f(&mut mock);
}
@ -453,11 +474,11 @@ mod tests {
fn setup_mocked_scheduler(
bank: Arc<Bank>,
wait_reasons: impl Iterator<Item = WaitReason>,
) -> DefaultInstalledSchedulerBox {
is_dropped_flags: impl Iterator<Item = bool>,
) -> InstalledSchedulerBox {
setup_mocked_scheduler_with_extra(
bank,
wait_reasons,
is_dropped_flags,
None::<fn(&mut MockInstalledScheduler) -> ()>,
)
}
@ -469,10 +490,7 @@ mod tests {
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank.clone(),
Some(setup_mocked_scheduler(
bank,
[WaitReason::TerminatedToFreeze].into_iter(),
)),
Some(setup_mocked_scheduler(bank, [false].into_iter())),
);
assert!(bank.has_installed_scheduler());
assert_matches!(bank.wait_for_completed_scheduler(), Some(_));
@ -502,10 +520,7 @@ mod tests {
let bank = Arc::new(Bank::default_for_tests());
let bank = BankWithScheduler::new(
bank.clone(),
Some(setup_mocked_scheduler(
bank,
[WaitReason::DroppedFromBankForks].into_iter(),
)),
Some(setup_mocked_scheduler(bank, [true].into_iter())),
);
drop(bank);
}
@ -517,13 +532,15 @@ mod tests {
let bank = Arc::new(crate::bank::tests::create_simple_test_bank(42));
let bank = BankWithScheduler::new(
bank.clone(),
Some(setup_mocked_scheduler(
Some(setup_mocked_scheduler_with_extra(
bank,
[
WaitReason::PausedForRecentBlockhash,
WaitReason::TerminatedToFreeze,
]
.into_iter(),
[false].into_iter(),
Some(|mocked: &mut MockInstalledScheduler| {
mocked
.expect_pause_for_recent_blockhash()
.times(1)
.returning(|| ());
}),
)),
);
goto_end_of_slot_with_scheduler(&bank);
@ -548,7 +565,7 @@ mod tests {
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mocked_scheduler = setup_mocked_scheduler_with_extra(
bank.clone(),
[WaitReason::DroppedFromBankForks].into_iter(),
[true].into_iter(),
Some(|mocked: &mut MockInstalledScheduler| {
mocked
.expect_schedule_execution()

View File

@ -142,6 +142,7 @@ type SlotPrioritizationFee = DashMap<BankId, PrioritizationFee>;
/// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee,
/// A separate internal thread `service_thread` handles additional tasks when a bank is frozen,
/// and collecting stats and reporting metrics.
#[derive(Debug)]
pub struct PrioritizationFeeCache {
cache: Arc<RwLock<LruCache<Slot, Arc<SlotPrioritizationFee>>>>,
service_thread: Option<JoinHandle<()>>,

View File

@ -0,0 +1,10 @@
[package]
name = "solana-unified-scheduler-logic"
description = "The Solana unified scheduler logic"
documentation = "https://docs.rs/solana-unified-scheduler-logic"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

View File

@ -0,0 +1 @@
// This file will be populated with actual implementation later.

View File

@ -0,0 +1,23 @@
[package]
name = "solana-unified-scheduler-pool"
description = "The Solana unified scheduler pool"
documentation = "https://docs.rs/solana-unified-scheduler-pool"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
[dependencies]
solana-ledger = { workspace = true }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-vote = { workspace = true }
[dev-dependencies]
assert_matches = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }

View File

@ -0,0 +1,761 @@
//! Transaction scheduling code.
//!
//! This crate implements 3 solana-runtime traits (`InstalledScheduler`, `UninstalledScheduler` and
//! `InstalledSchedulerPool`) to provide a concrete transaction scheduling implementation
//! (including executing txes and committing tx results).
//!
//! At the highest level, this crate takes `SanitizedTransaction`s via its `schedule_execution()`
//! and commits any side-effects (i.e. on-chain state changes) into the associated `Bank` via
//! `solana-ledger`'s helper function called `execute_batch()`.
use {
solana_ledger::blockstore_processor::{
execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
},
solana_program_runtime::timings::ExecuteTimings,
solana_runtime::{
bank::Bank,
installed_scheduler_pool::{
InstalledScheduler, InstalledSchedulerBox, InstalledSchedulerPool,
InstalledSchedulerPoolArc, ResultWithTimings, SchedulerId, SchedulingContext,
UninstalledScheduler, UninstalledSchedulerBox,
},
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::transaction::{Result, SanitizedTransaction},
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
fmt::Debug,
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc, Mutex, Weak,
},
},
};
type AtomicSchedulerId = AtomicU64;
// SchedulerPool must be accessed as a dyn trait from solana-runtime, because SchedulerPool
// contains some internal fields, whose types aren't available in solana-runtime (currently
// TransactionStatusSender; also, PohRecorder in the future)...
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
handler_context: HandlerContext,
// weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to
// Arc<Self> from &Self, because SchedulerPool is used as in the form of Arc<SchedulerPool>
// almost always. But, this would cause wasted and noisy Arc::clone()'s at every call sites.
//
// Alternatively, `impl InstalledScheduler for Arc<SchedulerPool>` approach could be explored
// but it entails its own problems due to rustc's coherence and necessitated newtype with the
// type graph of InstalledScheduler being quite elaborate.
//
// After these considerations, this weak_self approach is chosen at the cost of some additional
// memory increase.
weak_self: Weak<Self>,
next_scheduler_id: AtomicSchedulerId,
_phantom: PhantomData<TH>,
}
#[derive(Debug)]
pub struct HandlerContext {
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
}
pub type DefaultSchedulerPool =
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>;
impl<S, TH> SchedulerPool<S, TH>
where
S: SpawnableScheduler<TH>,
TH: TaskHandler,
{
// Some internal impl and test code want an actual concrete type, NOT the
// `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`.
fn new(
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Arc<Self> {
Arc::new_cyclic(|weak_self| Self {
scheduler_inners: Mutex::default(),
handler_context: HandlerContext {
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
prioritization_fee_cache,
},
weak_self: weak_self.clone(),
next_scheduler_id: AtomicSchedulerId::default(),
_phantom: PhantomData,
})
}
// This apparently-meaningless wrapper is handy, because some callers explicitly want
// `dyn InstalledSchedulerPool` to be returned for type inference convenience.
pub fn new_dyn(
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> InstalledSchedulerPoolArc {
Self::new(
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
prioritization_fee_cache,
)
}
// See a comment at the weak_self field for justification of this method's existence.
fn self_arc(&self) -> Arc<Self> {
self.weak_self
.upgrade()
.expect("self-referencing Arc-ed pool")
}
fn new_scheduler_id(&self) -> SchedulerId {
self.next_scheduler_id.fetch_add(1, Relaxed)
}
fn return_scheduler(&self, scheduler: S::Inner) {
self.scheduler_inners
.lock()
.expect("not poisoned")
.push(scheduler);
}
fn do_take_scheduler(&self, context: SchedulingContext) -> S {
// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
if let Some(inner) = self.scheduler_inners.lock().expect("not poisoned").pop() {
S::from_inner(inner, context)
} else {
S::spawn(self.self_arc(), context)
}
}
}
impl<S, TH> InstalledSchedulerPool for SchedulerPool<S, TH>
where
S: SpawnableScheduler<TH>,
TH: TaskHandler,
{
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
Box::new(self.do_take_scheduler(context))
}
}
pub trait TaskHandler: Send + Sync + Debug + Sized + 'static {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
index: usize,
handler_context: &HandlerContext,
);
}
#[derive(Debug)]
pub struct DefaultTaskHandler;
impl TaskHandler for DefaultTaskHandler {
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
index: usize,
handler_context: &HandlerContext,
) {
// scheduler must properly prevent conflicting tx executions. thus, task handler isn't
// responsible for locking.
let batch = bank.prepare_unlocked_batch_from_single_tx(transaction);
let batch_with_indexes = TransactionBatchWithIndexes {
batch,
transaction_indexes: vec![index],
};
*result = execute_batch(
&batch_with_indexes,
bank,
handler_context.transaction_status_sender.as_ref(),
handler_context.replay_vote_sender.as_ref(),
timings,
handler_context.log_messages_bytes_limit,
&handler_context.prioritization_fee_cache,
);
}
}
// Currently, simplest possible implementation (i.e. single-threaded)
// this will be replaced with more proper implementation...
// not usable at all, especially for mainnet-beta
#[derive(Debug)]
pub struct PooledScheduler<TH: TaskHandler> {
inner: PooledSchedulerInner<Self, TH>,
context: SchedulingContext,
result_with_timings: Mutex<ResultWithTimings>,
}
#[derive(Debug)]
pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
id: SchedulerId,
pool: Arc<SchedulerPool<S, TH>>,
}
impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
Self::from_inner(
PooledSchedulerInner::<Self, TH> {
id: pool.new_scheduler_id(),
pool,
},
initial_context,
)
}
}
pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {
type Inner: Debug + Send + Sync;
fn into_inner(self) -> (ResultWithTimings, Self::Inner);
fn from_inner(inner: Self::Inner, context: SchedulingContext) -> Self;
fn spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self
where
Self: Sized;
}
impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
type Inner = PooledSchedulerInner<Self, TH>;
fn into_inner(self) -> (ResultWithTimings, Self::Inner) {
(
self.result_with_timings.into_inner().expect("not poisoned"),
self.inner,
)
}
fn from_inner(inner: Self::Inner, context: SchedulingContext) -> Self {
Self {
inner,
context,
result_with_timings: Mutex::new((Ok(()), ExecuteTimings::default())),
}
}
fn spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
Self::do_spawn(pool, initial_context)
}
}
impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
fn id(&self) -> SchedulerId {
self.inner.id
}
fn context(&self) -> &SchedulingContext {
&self.context
}
fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) {
let (result, timings) = &mut *self.result_with_timings.lock().expect("not poisoned");
if result.is_err() {
// just bail out early to short-circuit the processing altogether
return;
}
// ... so, we're NOT scheduling at all here; rather, just execute tx straight off. the
// inter-tx locking deps aren't needed to be resolved in the case of single-threaded FIFO
// like this.
TH::handle(
result,
timings,
self.context().bank(),
transaction,
index,
&self.inner.pool.handler_context,
);
}
fn wait_for_termination(
self: Box<Self>,
_is_dropped: bool,
) -> (ResultWithTimings, UninstalledSchedulerBox) {
let (result_with_timings, uninstalled_scheduler) = self.into_inner();
(result_with_timings, Box::new(uninstalled_scheduler))
}
fn pause_for_recent_blockhash(&mut self) {
// not surprisingly, there's nothing to do for this min impl!
}
}
impl<S, TH> UninstalledScheduler for PooledSchedulerInner<S, TH>
where
S: SpawnableScheduler<TH, Inner = PooledSchedulerInner<S, TH>>,
TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
self.pool.clone().return_scheduler(*self)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
assert_matches::assert_matches,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
installed_scheduler_pool::{BankWithScheduler, SchedulingContext},
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{
clock::MAX_PROCESSING_AGE,
pubkey::Pubkey,
signer::keypair::Keypair,
system_transaction,
transaction::{SanitizedTransaction, TransactionError},
},
std::{sync::Arc, thread::JoinHandle},
};
#[test]
fn test_scheduler_pool_new() {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
// this indirectly proves that there should be circular link because there's only one Arc
// at this moment now
assert_eq!((Arc::strong_count(&pool), Arc::weak_count(&pool)), (1, 1));
let debug = format!("{pool:#?}");
assert!(!debug.is_empty());
}
#[test]
fn test_scheduler_spawn() {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = SchedulingContext::new(bank);
let scheduler = pool.take_scheduler(context);
let debug = format!("{scheduler:#?}");
assert!(!debug.is_empty());
}
#[test]
fn test_scheduler_pool_filo() {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);
let scheduler1 = pool.do_take_scheduler(context.clone());
let scheduler_id1 = scheduler1.id();
let scheduler2 = pool.do_take_scheduler(context.clone());
let scheduler_id2 = scheduler2.id();
assert_ne!(scheduler_id1, scheduler_id2);
let (result_with_timings, scheduler1) = scheduler1.into_inner();
assert_matches!(result_with_timings, (Ok(()), _));
pool.return_scheduler(scheduler1);
let (result_with_timings, scheduler2) = scheduler2.into_inner();
assert_matches!(result_with_timings, (Ok(()), _));
pool.return_scheduler(scheduler2);
let scheduler3 = pool.do_take_scheduler(context.clone());
assert_eq!(scheduler_id2, scheduler3.id());
let scheduler4 = pool.do_take_scheduler(context.clone());
assert_eq!(scheduler_id1, scheduler4.id());
}
#[test]
fn test_scheduler_pool_context_drop_unless_reinitialized() {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);
let mut scheduler = pool.do_take_scheduler(context.clone());
// should never panic.
scheduler.pause_for_recent_blockhash();
assert_matches!(
Box::new(scheduler).wait_for_termination(false),
((Ok(()), _), _)
);
}
#[test]
fn test_scheduler_pool_context_replace() {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let old_bank = &Arc::new(Bank::default_for_tests());
let new_bank = &Arc::new(Bank::default_for_tests());
assert!(!Arc::ptr_eq(old_bank, new_bank));
let old_context = &SchedulingContext::new(old_bank.clone());
let new_context = &SchedulingContext::new(new_bank.clone());
let scheduler = pool.do_take_scheduler(old_context.clone());
let scheduler_id = scheduler.id();
pool.return_scheduler(scheduler.into_inner().1);
let scheduler = pool.take_scheduler(new_context.clone());
assert_eq!(scheduler_id, scheduler.id());
assert!(Arc::ptr_eq(scheduler.context().bank(), new_bank));
}
#[test]
fn test_scheduler_pool_install_into_bank_forks() {
solana_logger::setup();
let bank = Bank::default_for_tests();
let bank_forks = BankForks::new_rw_arc(bank);
let mut bank_forks = bank_forks.write().unwrap();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
bank_forks.install_scheduler_pool(pool);
}
#[test]
fn test_scheduler_install_into_bank() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let child_bank = Bank::new_from_parent(bank, &Pubkey::default(), 1);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
let bank = Bank::default_for_tests();
let bank_forks = BankForks::new_rw_arc(bank);
let mut bank_forks = bank_forks.write().unwrap();
// existing banks in bank_forks shouldn't process transactions anymore in general, so
// shouldn't be touched
assert!(!bank_forks
.working_bank_with_scheduler()
.has_installed_scheduler());
bank_forks.install_scheduler_pool(pool);
assert!(!bank_forks
.working_bank_with_scheduler()
.has_installed_scheduler());
let mut child_bank = bank_forks.insert(child_bank);
assert!(child_bank.has_installed_scheduler());
bank_forks.remove(child_bank.slot());
child_bank.drop_scheduler();
assert!(!child_bank.has_installed_scheduler());
}
fn setup_dummy_fork_graph(bank: Bank) -> Arc<Bank> {
let slot = bank.slot();
let bank_fork = BankForks::new_rw_arc(bank);
let bank = bank_fork.read().unwrap().get(slot).unwrap();
bank.loaded_programs_cache
.write()
.unwrap()
.set_fork_graph(bank_fork);
bank
}
#[test]
fn test_scheduler_schedule_execution_success() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let tx0 = &SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));
let bank = Bank::new_for_tests(&genesis_config);
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());
assert_eq!(bank.transaction_count(), 0);
let scheduler = pool.take_scheduler(context);
scheduler.schedule_execution(&(tx0, 0));
let bank = BankWithScheduler::new(bank, Some(scheduler));
assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
assert_eq!(bank.transaction_count(), 1);
}
#[test]
fn test_scheduler_schedule_execution_failure() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());
let mut scheduler = pool.take_scheduler(context);
let unfunded_keypair = Keypair::new();
let bad_tx =
&SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&unfunded_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));
assert_eq!(bank.transaction_count(), 0);
scheduler.schedule_execution(&(bad_tx, 0));
scheduler.pause_for_recent_blockhash();
assert_eq!(bank.transaction_count(), 0);
let good_tx_after_bad_tx =
&SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
3,
genesis_config.hash(),
));
// make sure this tx is really a good one to execute.
assert_matches!(
bank.simulate_transaction_unchecked(good_tx_after_bad_tx, false)
.result,
Ok(_)
);
scheduler.schedule_execution(&(good_tx_after_bad_tx, 0));
scheduler.pause_for_recent_blockhash();
// transaction_count should remain same as scheduler should be bailing out.
assert_eq!(bank.transaction_count(), 0);
let bank = BankWithScheduler::new(bank, Some(scheduler));
assert_matches!(
bank.wait_for_completed_scheduler(),
Some((
Err(solana_sdk::transaction::TransactionError::AccountNotFound),
_timings
))
);
}
#[derive(Debug)]
struct AsyncScheduler<const TRIGGER_RACE_CONDITION: bool>(
PooledScheduler<DefaultTaskHandler>,
Mutex<Vec<JoinHandle<ResultWithTimings>>>,
);
impl<const TRIGGER_RACE_CONDITION: bool> AsyncScheduler<TRIGGER_RACE_CONDITION> {
fn do_wait(&self) {
let mut overall_result = Ok(());
let mut overall_timings = ExecuteTimings::default();
for handle in self.1.lock().unwrap().drain(..) {
let (result, timings) = handle.join().unwrap();
match result {
Ok(()) => {}
Err(e) => overall_result = Err(e),
}
overall_timings.accumulate(&timings);
}
*self.0.result_with_timings.lock().unwrap() = (overall_result, overall_timings);
}
}
impl<const TRIGGER_RACE_CONDITION: bool> InstalledScheduler
for AsyncScheduler<TRIGGER_RACE_CONDITION>
{
fn id(&self) -> SchedulerId {
self.0.id()
}
fn context(&self) -> &SchedulingContext {
self.0.context()
}
fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) {
let transaction_and_index = (transaction.clone(), index);
let context = self.context().clone();
let pool = self.0.inner.pool.clone();
self.1.lock().unwrap().push(std::thread::spawn(move || {
// intentionally sleep to simulate race condition where register_recent_blockhash
// is handle before finishing executing scheduled transactions
std::thread::sleep(std::time::Duration::from_secs(1));
let mut result = Ok(());
let mut timings = ExecuteTimings::default();
<DefaultTaskHandler as TaskHandler>::handle(
&mut result,
&mut timings,
context.bank(),
&transaction_and_index.0,
transaction_and_index.1,
&pool.handler_context,
);
(result, timings)
}));
}
fn wait_for_termination(
self: Box<Self>,
is_dropped: bool,
) -> (ResultWithTimings, UninstalledSchedulerBox) {
self.do_wait();
Box::new(self.0).wait_for_termination(is_dropped)
}
fn pause_for_recent_blockhash(&mut self) {
if TRIGGER_RACE_CONDITION {
// this is equivalent to NOT calling wait_for_paused_scheduler() in
// register_recent_blockhash().
return;
}
self.do_wait();
}
}
impl<const TRIGGER_RACE_CONDITION: bool> SpawnableScheduler<DefaultTaskHandler>
for AsyncScheduler<TRIGGER_RACE_CONDITION>
{
// well, i wish i can use ! (never type).....
type Inner = Self;
fn into_inner(self) -> (ResultWithTimings, Self::Inner) {
todo!();
}
fn from_inner(_inner: Self::Inner, _context: SchedulingContext) -> Self {
todo!();
}
fn spawn(
pool: Arc<SchedulerPool<Self, DefaultTaskHandler>>,
initial_context: SchedulingContext,
) -> Self {
AsyncScheduler::<TRIGGER_RACE_CONDITION>(
PooledScheduler::<DefaultTaskHandler>::from_inner(
PooledSchedulerInner {
id: pool.new_scheduler_id(),
pool: SchedulerPool::new(
pool.handler_context.log_messages_bytes_limit,
pool.handler_context.transaction_status_sender.clone(),
pool.handler_context.replay_vote_sender.clone(),
pool.handler_context.prioritization_fee_cache.clone(),
),
},
initial_context,
),
Mutex::new(vec![]),
)
}
}
fn do_test_scheduler_schedule_execution_recent_blockhash_edge_case<
const TRIGGER_RACE_CONDITION: bool,
>() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let very_old_valid_tx =
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&solana_sdk::pubkey::new_rand(),
2,
genesis_config.hash(),
));
let mut bank = Bank::new_for_tests(&genesis_config);
for _ in 0..MAX_PROCESSING_AGE {
bank.fill_bank_with_ticks_for_tests();
bank.freeze();
let slot = bank.slot();
bank = Bank::new_from_parent(
Arc::new(bank),
&Pubkey::default(),
slot.checked_add(1).unwrap(),
);
}
let bank = setup_dummy_fork_graph(bank);
let context = SchedulingContext::new(bank.clone());
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
SchedulerPool::<AsyncScheduler<TRIGGER_RACE_CONDITION>, DefaultTaskHandler>::new_dyn(
None,
None,
None,
ignored_prioritization_fee_cache,
);
let scheduler = pool.take_scheduler(context);
let bank = BankWithScheduler::new(bank, Some(scheduler));
assert_eq!(bank.transaction_count(), 0);
// schedule but not immediately execute transaction
bank.schedule_transaction_executions([(&very_old_valid_tx, &0)].into_iter());
// this calls register_recent_blockhash internally
bank.fill_bank_with_ticks_for_tests();
if TRIGGER_RACE_CONDITION {
// very_old_valid_tx is wrongly handled as expired!
assert_matches!(
bank.wait_for_completed_scheduler(),
Some((Err(TransactionError::BlockhashNotFound), _))
);
assert_eq!(bank.transaction_count(), 0);
} else {
assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _)));
assert_eq!(bank.transaction_count(), 1);
}
}
#[test]
fn test_scheduler_schedule_execution_recent_blockhash_edge_case_with_race() {
do_test_scheduler_schedule_execution_recent_blockhash_edge_case::<true>();
}
#[test]
fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() {
do_test_scheduler_schedule_execution_recent_blockhash_edge_case::<false>();
}
}