Introduce SchedulingStateMachine for unified scheduler (#129)
* Introduce SchedulingStateMachine * Apply all typo fixes from code review Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com> * Update word wrapping * Clarify Token::assume_exclusive_mutating_thread() * Use slice instead of &Vec<_> * Improve non-const explanation * Document consecutive readonly rescheduling opt. * Make test_gradual_locking terminate for miri * Avoid unnecessary Task::clone() * Rename: lock_{status,result} and no attempt_...() * Add safety comment for get_account_locks_unchecked * Reduce and comment about Page::blocked_tasks cap. * Document SchedulingStateMachine::schedule_task() * Add justification of closure in create_task * Use the From trait for PageUsage * Replace unneeded if-let with .expect() * Add helpful comments for peculiar crossbeam usage * Fix typo * Make bug-bounty-exempt statement more clear * Add test_enfoced_get_account_locks_verification * Fix typos... * Big rename: Page => UsageQueue * Document UsageQueueLoader * Various minor cleanings for beautifier diff * Ensure reinitialize() is maintained for new fields * Remove uneeded impl Send for TokenCell & doc upd. * Apply typo fixes from code review Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com> * Merge similar tests into one * Remove test_debug * Remove assertions of task_index() * Fix UB in TokenCell * Make schedule_task doc comment simpler * Document deschedule_task * Simplify unlock_usage_queue() args * Add comment for try_unblock() -> None * Switch to Option<Usage> for fewer assert!s * Add assert_matches!() to UsageQueue methods * Add panicking test case for ::reinitialize() * Use UsageFromTask * Rename: LockAttempt => LockContext * Move locking and unlocking methods to usage queue * Remove outdated comment... * Remove redundant fn: pop_unblocked_usage_from_task * Document the index of task * Clarifty comment a bit * Update .current_usage inside try_lock() * Use inspect_err to simplify code * fix ci... * Use ()... * Rename: schedule{,_next}_unblocked_task() * Rename: {try_lock,unlock}_{for_task,usage_queues} * Test solana-unified-scheduler-logic under miri * Test UB to illustrate limitation of TokenCell * Test UB of using multiple tokens at the same time --------- Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
This commit is contained in:
parent
855a0c1a92
commit
0b9c6379b3
|
@ -7552,7 +7552,9 @@ dependencies = [
|
|||
name = "solana-unified-scheduler-logic"
|
||||
version = "2.0.0"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"solana-sdk",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -7561,6 +7563,7 @@ version = "2.0.0"
|
|||
dependencies = [
|
||||
"assert_matches",
|
||||
"crossbeam-channel",
|
||||
"dashmap",
|
||||
"derivative",
|
||||
"log",
|
||||
"solana-ledger",
|
||||
|
|
|
@ -2,7 +2,16 @@
|
|||
|
||||
set -eo pipefail
|
||||
|
||||
source ci/_
|
||||
source ci/rust-version.sh nightly
|
||||
|
||||
# miri is very slow; so only run very few of selective tests!
|
||||
cargo "+${rust_nightly}" miri test -p solana-program -- hash:: account_info::
|
||||
_ cargo "+${rust_nightly}" miri test -p solana-program -- hash:: account_info::
|
||||
|
||||
_ cargo "+${rust_nightly}" miri test -p solana-unified-scheduler-logic
|
||||
|
||||
# run intentionally-#[ignored] ub triggering tests for each to make sure they fail
|
||||
(! _ cargo "+${rust_nightly}" miri test -p solana-unified-scheduler-logic -- \
|
||||
--ignored --exact "utils::tests::test_ub_illegally_created_multiple_tokens")
|
||||
(! _ cargo "+${rust_nightly}" miri test -p solana-unified-scheduler-logic -- \
|
||||
--ignored --exact "utils::tests::test_ub_illegally_shared_token_cell")
|
||||
|
|
|
@ -6527,7 +6527,9 @@ dependencies = [
|
|||
name = "solana-unified-scheduler-logic"
|
||||
version = "2.0.0"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"solana-sdk",
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -6536,6 +6538,7 @@ version = "2.0.0"
|
|||
dependencies = [
|
||||
"assert_matches",
|
||||
"crossbeam-channel",
|
||||
"dashmap",
|
||||
"derivative",
|
||||
"log",
|
||||
"solana-ledger",
|
||||
|
|
|
@ -3562,6 +3562,9 @@ impl Bank {
|
|||
transaction: &'a SanitizedTransaction,
|
||||
) -> TransactionBatch<'_, '_> {
|
||||
let tx_account_lock_limit = self.get_transaction_account_lock_limit();
|
||||
// Note that switching this to .get_account_locks_unchecked() is unacceptable currently.
|
||||
// The unified scheduler relies on the checks enforced here.
|
||||
// See a comment in SchedulingStateMachine::create_task().
|
||||
let lock_result = transaction
|
||||
.get_account_locks(tx_account_lock_limit)
|
||||
.map(|_| ());
|
||||
|
|
|
@ -10,4 +10,6 @@ license = { workspace = true }
|
|||
edition = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
assert_matches = { workspace = true }
|
||||
solana-sdk = { workspace = true }
|
||||
static_assertions = { workspace = true }
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -12,6 +12,7 @@ edition = { workspace = true }
|
|||
[dependencies]
|
||||
assert_matches = { workspace = true }
|
||||
crossbeam-channel = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
derivative = { workspace = true }
|
||||
log = { workspace = true }
|
||||
solana-ledger = { workspace = true }
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
//! NOTE: While the unified scheduler is fully functional and moderately performant even with
|
||||
//! mainnet-beta, it has known resource-exhaustion related security issues for replaying
|
||||
//! specially-crafted blocks produced by malicious leaders. Thus, this experimental and
|
||||
//! nondefault functionality is exempt from the bug bounty program for now.
|
||||
//!
|
||||
//! Transaction scheduling code.
|
||||
//!
|
||||
//! This crate implements 3 solana-runtime traits (`InstalledScheduler`, `UninstalledScheduler` and
|
||||
|
@ -10,7 +15,8 @@
|
|||
|
||||
use {
|
||||
assert_matches::assert_matches,
|
||||
crossbeam_channel::{select, unbounded, Receiver, SendError, Sender},
|
||||
crossbeam_channel::{never, select, unbounded, Receiver, RecvError, SendError, Sender},
|
||||
dashmap::DashMap,
|
||||
derivative::Derivative,
|
||||
log::*,
|
||||
solana_ledger::blockstore_processor::{
|
||||
|
@ -26,8 +32,11 @@ use {
|
|||
},
|
||||
prioritization_fee_cache::PrioritizationFeeCache,
|
||||
},
|
||||
solana_sdk::transaction::{Result, SanitizedTransaction},
|
||||
solana_unified_scheduler_logic::Task,
|
||||
solana_sdk::{
|
||||
pubkey::Pubkey,
|
||||
transaction::{Result, SanitizedTransaction},
|
||||
},
|
||||
solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue},
|
||||
solana_vote::vote_sender_types::ReplayVoteSender,
|
||||
std::{
|
||||
fmt::Debug,
|
||||
|
@ -90,10 +99,8 @@ where
|
|||
replay_vote_sender: Option<ReplayVoteSender>,
|
||||
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
|
||||
) -> Arc<Self> {
|
||||
let handler_count = handler_count.unwrap_or(1);
|
||||
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
|
||||
// single-threaded still.
|
||||
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later
|
||||
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
|
||||
assert!(handler_count >= 1);
|
||||
|
||||
Arc::new_cyclic(|weak_self| Self {
|
||||
scheduler_inners: Mutex::default(),
|
||||
|
@ -386,13 +393,35 @@ mod chained_channel {
|
|||
}
|
||||
}
|
||||
|
||||
/// The primary owner of all [`UsageQueue`]s used for particular [`PooledScheduler`].
|
||||
///
|
||||
/// Currently, the simplest implementation. This grows memory usage in unbounded way. Cleaning will
|
||||
/// be added later. This struct is here to be put outside `solana-unified-scheduler-logic` for the
|
||||
/// crate's original intent (separation of logics from this crate). Some practical and mundane
|
||||
/// pruning will be implemented in this type.
|
||||
#[derive(Default, Debug)]
|
||||
pub struct UsageQueueLoader {
|
||||
usage_queues: DashMap<Pubkey, UsageQueue>,
|
||||
}
|
||||
|
||||
impl UsageQueueLoader {
|
||||
pub fn load(&self, address: Pubkey) -> UsageQueue {
|
||||
self.usage_queues.entry(address).or_default().clone()
|
||||
}
|
||||
}
|
||||
|
||||
// (this is slow needing atomic mem reads. However, this can be turned into a lot faster
|
||||
// optimizer-friendly version as shown in this crossbeam pr:
|
||||
// https://github.com/crossbeam-rs/crossbeam/pull/1047)
|
||||
fn disconnected<T>() -> Receiver<T> {
|
||||
// drop the sender residing at .0, returning an always-disconnected receiver.
|
||||
unbounded().1
|
||||
}
|
||||
|
||||
fn initialized_result_with_timings() -> ResultWithTimings {
|
||||
(Ok(()), ExecuteTimings::default())
|
||||
}
|
||||
|
||||
// Currently, simplest possible implementation (i.e. single-threaded)
|
||||
// this will be replaced with more proper implementation...
|
||||
// not usable at all, especially for mainnet-beta
|
||||
#[derive(Debug)]
|
||||
pub struct PooledScheduler<TH: TaskHandler> {
|
||||
inner: PooledSchedulerInner<Self, TH>,
|
||||
|
@ -402,6 +431,7 @@ pub struct PooledScheduler<TH: TaskHandler> {
|
|||
#[derive(Debug)]
|
||||
pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
|
||||
thread_manager: ThreadManager<S, TH>,
|
||||
usage_queue_loader: UsageQueueLoader,
|
||||
}
|
||||
|
||||
// This type manages the OS threads for scheduling and executing transactions. The term
|
||||
|
@ -427,6 +457,7 @@ impl<TH: TaskHandler> PooledScheduler<TH> {
|
|||
Self::from_inner(
|
||||
PooledSchedulerInner::<Self, TH> {
|
||||
thread_manager: ThreadManager::new(pool),
|
||||
usage_queue_loader: UsageQueueLoader::default(),
|
||||
},
|
||||
initial_context,
|
||||
)
|
||||
|
@ -518,7 +549,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
|
|||
let new_task_receiver = self.new_task_receiver.clone();
|
||||
|
||||
let mut session_ending = false;
|
||||
let mut active_task_count: usize = 0;
|
||||
|
||||
// Now, this is the main loop for the scheduler thread, which is a special beast.
|
||||
//
|
||||
|
@ -558,61 +588,99 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
|
|||
// cycles out of the scheduler thread. Thus, any kinds of unessential overhead sources
|
||||
// like syscalls, VDSO, and even memory (de)allocation should be avoided at all costs
|
||||
// by design or by means of offloading at the last resort.
|
||||
move || loop {
|
||||
let mut is_finished = false;
|
||||
while !is_finished {
|
||||
select! {
|
||||
recv(finished_task_receiver) -> executed_task => {
|
||||
let executed_task = executed_task.unwrap();
|
||||
move || {
|
||||
let (do_now, dont_now) = (&disconnected::<()>(), &never::<()>());
|
||||
let dummy_receiver = |trigger| {
|
||||
if trigger {
|
||||
do_now
|
||||
} else {
|
||||
dont_now
|
||||
}
|
||||
};
|
||||
|
||||
active_task_count = active_task_count.checked_sub(1).unwrap();
|
||||
let result_with_timings = result_with_timings.as_mut().unwrap();
|
||||
Self::accumulate_result_with_timings(result_with_timings, executed_task);
|
||||
},
|
||||
recv(new_task_receiver) -> message => {
|
||||
assert!(!session_ending);
|
||||
let mut state_machine = unsafe {
|
||||
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling()
|
||||
};
|
||||
|
||||
match message.unwrap() {
|
||||
NewTaskPayload::Payload(task) => {
|
||||
// so, we're NOT scheduling at all here; rather, just execute
|
||||
// tx straight off. the inter-tx locking deps aren't needed to
|
||||
// be resolved in the case of single-threaded FIFO like this.
|
||||
runnable_task_sender
|
||||
.send_payload(task)
|
||||
.unwrap();
|
||||
active_task_count = active_task_count.checked_add(1).unwrap();
|
||||
loop {
|
||||
let mut is_finished = false;
|
||||
while !is_finished {
|
||||
// ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl,
|
||||
// which isn't great and is inconsistent with `if`s in the Rust's match
|
||||
// arm. So, eagerly binding the result to a variable unconditionally here
|
||||
// makes no perf. difference...
|
||||
let dummy_unblocked_task_receiver =
|
||||
dummy_receiver(state_machine.has_unblocked_task());
|
||||
|
||||
// (Assume this is biased; i.e. select_biased! in this crossbeam pr:
|
||||
// https://github.com/rust-lang/futures-rs/pull/1976)
|
||||
//
|
||||
// There's something special called dummy_unblocked_task_receiver here.
|
||||
// This odd pattern was needed to react to newly unblocked tasks from
|
||||
// _not-crossbeam-channel_ event sources, precisely at the specified
|
||||
// precedence among other selectors, while delegating the conrol flow to
|
||||
// select_biased!.
|
||||
//
|
||||
// In this way, hot looping is avoided and overall control flow is much
|
||||
// consistent. Note that unified scheduler will go
|
||||
// into busy looping to seek lowest latency eventually. However, not now,
|
||||
// to measure _actual_ cpu usage easily with the select approach.
|
||||
select! {
|
||||
recv(finished_task_receiver) -> executed_task => {
|
||||
let executed_task = executed_task.unwrap();
|
||||
|
||||
state_machine.deschedule_task(&executed_task.task);
|
||||
let result_with_timings = result_with_timings.as_mut().unwrap();
|
||||
Self::accumulate_result_with_timings(result_with_timings, executed_task);
|
||||
},
|
||||
recv(dummy_unblocked_task_receiver) -> dummy => {
|
||||
assert_matches!(dummy, Err(RecvError));
|
||||
|
||||
let task = state_machine
|
||||
.schedule_next_unblocked_task()
|
||||
.expect("unblocked task");
|
||||
runnable_task_sender.send_payload(task).unwrap();
|
||||
},
|
||||
recv(new_task_receiver) -> message => {
|
||||
assert!(!session_ending);
|
||||
|
||||
match message.unwrap() {
|
||||
NewTaskPayload::Payload(task) => {
|
||||
if let Some(task) = state_machine.schedule_task(task) {
|
||||
runnable_task_sender.send_payload(task).unwrap();
|
||||
}
|
||||
}
|
||||
NewTaskPayload::OpenSubchannel(context) => {
|
||||
// signal about new SchedulingContext to handler threads
|
||||
runnable_task_sender
|
||||
.send_chained_channel(context, handler_count)
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
result_with_timings.replace(initialized_result_with_timings()),
|
||||
None
|
||||
);
|
||||
}
|
||||
NewTaskPayload::CloseSubchannel => {
|
||||
session_ending = true;
|
||||
}
|
||||
}
|
||||
NewTaskPayload::OpenSubchannel(context) => {
|
||||
// signal about new SchedulingContext to handler threads
|
||||
runnable_task_sender
|
||||
.send_chained_channel(context, handler_count)
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
result_with_timings.replace(initialized_result_with_timings()),
|
||||
None
|
||||
);
|
||||
}
|
||||
NewTaskPayload::CloseSubchannel => {
|
||||
session_ending = true;
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
// a really simplistic termination condition, which only works under the
|
||||
// assumption of single handler thread...
|
||||
is_finished = session_ending && active_task_count == 0;
|
||||
}
|
||||
is_finished = session_ending && state_machine.has_no_active_task();
|
||||
}
|
||||
|
||||
if session_ending {
|
||||
session_result_sender
|
||||
.send(Some(
|
||||
result_with_timings
|
||||
.take()
|
||||
.unwrap_or_else(initialized_result_with_timings),
|
||||
))
|
||||
.unwrap();
|
||||
session_ending = false;
|
||||
if session_ending {
|
||||
state_machine.reinitialize();
|
||||
session_result_sender
|
||||
.send(Some(
|
||||
result_with_timings
|
||||
.take()
|
||||
.unwrap_or_else(initialized_result_with_timings),
|
||||
))
|
||||
.unwrap();
|
||||
session_ending = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -741,7 +809,9 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
|
|||
}
|
||||
|
||||
fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) {
|
||||
let task = Task::create_task(transaction.clone(), index);
|
||||
let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| {
|
||||
self.inner.usage_queue_loader.load(pubkey)
|
||||
});
|
||||
self.inner.thread_manager.send_task(task);
|
||||
}
|
||||
|
||||
|
@ -1020,7 +1090,7 @@ mod tests {
|
|||
.result,
|
||||
Ok(_)
|
||||
);
|
||||
scheduler.schedule_execution(&(good_tx_after_bad_tx, 0));
|
||||
scheduler.schedule_execution(&(good_tx_after_bad_tx, 1));
|
||||
scheduler.pause_for_recent_blockhash();
|
||||
// transaction_count should remain same as scheduler should be bailing out.
|
||||
// That's because we're testing the serialized failing execution case in this test.
|
||||
|
@ -1244,4 +1314,42 @@ mod tests {
|
|||
4
|
||||
);
|
||||
}
|
||||
|
||||
// See comment in SchedulingStateMachine::create_task() for the justification of this test
|
||||
#[test]
|
||||
fn test_enfoced_get_account_locks_validation() {
|
||||
solana_logger::setup();
|
||||
|
||||
let GenesisConfigInfo {
|
||||
genesis_config,
|
||||
ref mint_keypair,
|
||||
..
|
||||
} = create_genesis_config(10_000);
|
||||
let bank = Bank::new_for_tests(&genesis_config);
|
||||
let bank = &setup_dummy_fork_graph(bank);
|
||||
|
||||
let mut tx = system_transaction::transfer(
|
||||
mint_keypair,
|
||||
&solana_sdk::pubkey::new_rand(),
|
||||
2,
|
||||
genesis_config.hash(),
|
||||
);
|
||||
// mangle the transfer tx to try to lock fee_payer (= mint_keypair) address twice!
|
||||
tx.message.account_keys.push(tx.message.account_keys[0]);
|
||||
let tx = &SanitizedTransaction::from_transaction_for_tests(tx);
|
||||
|
||||
// this internally should call SanitizedTransaction::get_account_locks().
|
||||
let result = &mut Ok(());
|
||||
let timings = &mut ExecuteTimings::default();
|
||||
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
|
||||
let handler_context = &HandlerContext {
|
||||
log_messages_bytes_limit: None,
|
||||
transaction_status_sender: None,
|
||||
replay_vote_sender: None,
|
||||
prioritization_fee_cache,
|
||||
};
|
||||
|
||||
DefaultTaskHandler::handle(result, timings, bank, tx, 0, handler_context);
|
||||
assert_matches!(result, Err(TransactionError::AccountLoadedTwice));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue