Introduce primitive threading in unified scheduler (#34676)

* Introduce primitive threading in unified scheduler

* Make the internal struct ExecutedTask not pub

* Improve wording a bit

* Explain scheduler main loop's overhead sensitivity

* Improve wording a bit

* Define ChainedChannel{Sender, Receiver} wrappers

* Clean up a bit

* Use derivative to avoid manual Clone impl

* Clarify comment

* Remove extra whitespace in comment

* Remove unneeded dyn trait for ChainedChannel

* Remove the accumulator thread for now

* Fix typo

* Use unimplemented!() to convey intention better
This commit is contained in:
Ryo Onodera 2024-01-24 12:46:16 +09:00 committed by GitHub
parent bfbe03a536
commit bd103865df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 535 additions and 60 deletions

6
Cargo.lock generated
View File

@ -7534,12 +7534,18 @@ dependencies = [
[[package]]
name = "solana-unified-scheduler-logic"
version = "1.18.0"
dependencies = [
"solana-sdk",
]
[[package]]
name = "solana-unified-scheduler-pool"
version = "1.18.0"
dependencies = [
"assert_matches",
"crossbeam-channel",
"derivative",
"log",
"solana-ledger",
"solana-logger",
"solana-program-runtime",

View File

@ -185,6 +185,7 @@ ctrlc = "3.4.2"
curve25519-dalek = "3.2.1"
dashmap = "5.5.3"
derivation-path = { version = "0.2.0", default-features = false }
derivative = "2.2.0"
dialoguer = "0.10.4"
digest = "0.10.7"
dir-diff = "0.3.3"

View File

@ -6534,11 +6534,18 @@ dependencies = [
[[package]]
name = "solana-unified-scheduler-logic"
version = "1.18.0"
dependencies = [
"solana-sdk",
]
[[package]]
name = "solana-unified-scheduler-pool"
version = "1.18.0"
dependencies = [
"assert_matches",
"crossbeam-channel",
"derivative",
"log",
"solana-ledger",
"solana-program-runtime",
"solana-runtime",

View File

@ -8,3 +8,6 @@ repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
[dependencies]
solana-sdk = { workspace = true }

View File

@ -1 +1,20 @@
// This file will be populated with actual implementation later.
use solana_sdk::transaction::SanitizedTransaction;
pub struct Task {
transaction: SanitizedTransaction,
index: usize,
}
impl Task {
pub fn create_task(transaction: SanitizedTransaction, index: usize) -> Self {
Task { transaction, index }
}
pub fn task_index(&self) -> usize {
self.index
}
pub fn transaction(&self) -> &SanitizedTransaction {
&self.transaction
}
}

View File

@ -10,6 +10,10 @@ license = { workspace = true }
edition = { workspace = true }
[dependencies]
assert_matches = { workspace = true }
crossbeam-channel = { workspace = true }
derivative = { workspace = true }
log = { workspace = true }
solana-ledger = { workspace = true }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true }

View File

@ -9,6 +9,10 @@
//! `solana-ledger`'s helper function called `execute_batch()`.
use {
assert_matches::assert_matches,
crossbeam_channel::{select, unbounded, Receiver, SendError, Sender},
derivative::Derivative,
log::*,
solana_ledger::blockstore_processor::{
execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
},
@ -23,6 +27,7 @@ use {
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::transaction::{Result, SanitizedTransaction},
solana_unified_scheduler_logic::Task,
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
fmt::Debug,
@ -31,6 +36,7 @@ use {
atomic::{AtomicU64, Ordering::Relaxed},
Arc, Mutex, Weak,
},
thread::{self, JoinHandle},
},
};
@ -194,6 +200,155 @@ impl TaskHandler for DefaultTaskHandler {
}
}
struct ExecutedTask {
task: Task,
result_with_timings: ResultWithTimings,
}
impl ExecutedTask {
fn new_boxed(task: Task) -> Box<Self> {
Box::new(Self {
task,
result_with_timings: initialized_result_with_timings(),
})
}
}
// A very tiny generic message type to signal about opening and closing of subchannels, which are
// logically segmented series of Payloads (P1) over a single continuous time-span, potentially
// carrying some subchannel metadata (P2) upon opening a new subchannel.
// Note that the above properties can be upheld only when this is used inside MPSC or SPSC channels
// (i.e. the consumer side needs to be single threaded). For the multiple consumer cases,
// ChainedChannel can be used instead.
enum SubchanneledPayload<P1, P2> {
Payload(P1),
OpenSubchannel(P2),
CloseSubchannel,
}
type NewTaskPayload = SubchanneledPayload<Task, SchedulingContext>;
// A tiny generic message type to synchronize multiple threads everytime some contextual data needs
// to be switched (ie. SchedulingContext), just using a single communication channel.
//
// Usually, there's no way to prevent one of those threads from mixing current and next contexts
// while processing messages with a multiple-consumer channel. A condvar or other
// out-of-bound mechanism is needed to notify about switching of contextual data. That's because
// there's no way to block those threads reliably on such a switching event just with a channel.
//
// However, if the number of consumer can be determined, this can be accomplished just over a
// single channel, which even carries an in-bound control meta-message with the contexts. The trick
// is that identical meta-messages as many as the number of threads are sent over the channel,
// along with new channel receivers to be used (hence the name of _chained_). Then, the receiving
// thread drops the old channel and is now blocked on receiving from the new channel. In this way,
// this switching can happen exactly once for each thread.
//
// Overall, this greatly simplifies the code, reduces CAS/syscall overhead per messaging to the
// minimum at the cost of a single channel recreation per switching. Needless to say, such an
// allocation can be amortized to be negligible.
mod chained_channel {
use super::*;
// hide variants by putting this inside newtype
enum ChainedChannelPrivate<P, C> {
Payload(P),
ContextAndChannel(C, Receiver<ChainedChannel<P, C>>),
}
pub(super) struct ChainedChannel<P, C>(ChainedChannelPrivate<P, C>);
impl<P, C> ChainedChannel<P, C> {
fn chain_to_new_channel(context: C, receiver: Receiver<Self>) -> Self {
Self(ChainedChannelPrivate::ContextAndChannel(context, receiver))
}
}
pub(super) struct ChainedChannelSender<P, C> {
sender: Sender<ChainedChannel<P, C>>,
}
impl<P, C: Clone> ChainedChannelSender<P, C> {
fn new(sender: Sender<ChainedChannel<P, C>>) -> Self {
Self { sender }
}
pub(super) fn send_payload(
&self,
payload: P,
) -> std::result::Result<(), SendError<ChainedChannel<P, C>>> {
self.sender
.send(ChainedChannel(ChainedChannelPrivate::Payload(payload)))
}
pub(super) fn send_chained_channel(
&mut self,
context: C,
count: usize,
) -> std::result::Result<(), SendError<ChainedChannel<P, C>>> {
let (chained_sender, chained_receiver) = crossbeam_channel::unbounded();
for _ in 0..count {
self.sender.send(ChainedChannel::chain_to_new_channel(
context.clone(),
chained_receiver.clone(),
))?
}
self.sender = chained_sender;
Ok(())
}
}
// P doesn't need to be `: Clone`, yet rustc derive can't handle it.
// see https://github.com/rust-lang/rust/issues/26925
#[derive(Derivative)]
#[derivative(Clone(bound = "C: Clone"))]
pub(super) struct ChainedChannelReceiver<P, C: Clone> {
receiver: Receiver<ChainedChannel<P, C>>,
context: C,
}
impl<P, C: Clone> ChainedChannelReceiver<P, C> {
fn new(receiver: Receiver<ChainedChannel<P, C>>, initial_context: C) -> Self {
Self {
receiver,
context: initial_context,
}
}
pub(super) fn context(&self) -> &C {
&self.context
}
pub(super) fn for_select(&self) -> &Receiver<ChainedChannel<P, C>> {
&self.receiver
}
pub(super) fn after_select(&mut self, message: ChainedChannel<P, C>) -> Option<P> {
match message.0 {
ChainedChannelPrivate::Payload(payload) => Some(payload),
ChainedChannelPrivate::ContextAndChannel(context, channel) => {
self.context = context;
self.receiver = channel;
None
}
}
}
}
pub(super) fn unbounded<P, C: Clone>(
initial_context: C,
) -> (ChainedChannelSender<P, C>, ChainedChannelReceiver<P, C>) {
let (sender, receiver) = crossbeam_channel::unbounded();
(
ChainedChannelSender::new(sender),
ChainedChannelReceiver::new(receiver, initial_context),
)
}
}
fn initialized_result_with_timings() -> ResultWithTimings {
(Ok(()), ExecuteTimings::default())
}
// Currently, simplest possible implementation (i.e. single-threaded)
// this will be replaced with more proper implementation...
// not usable at all, especially for mainnet-beta
@ -201,27 +356,306 @@ impl TaskHandler for DefaultTaskHandler {
pub struct PooledScheduler<TH: TaskHandler> {
inner: PooledSchedulerInner<Self, TH>,
context: SchedulingContext,
result_with_timings: Mutex<ResultWithTimings>,
}
#[derive(Debug)]
pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
id: SchedulerId,
thread_manager: ThreadManager<S, TH>,
}
// This type manages the OS threads for scheduling and executing transactions. The term
// `session` is consistently used to mean a group of Tasks scoped under a single SchedulingContext.
// This is equivalent to a particular bank for block verification. However, new terms is introduced
// here to mean some continuous time over multiple continuous banks/slots for the block production,
// which is planned to be implemented in the future.
#[derive(Debug)]
struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_id: SchedulerId,
pool: Arc<SchedulerPool<S, TH>>,
handler_count: usize,
new_task_sender: Sender<NewTaskPayload>,
new_task_receiver: Receiver<NewTaskPayload>,
session_result_sender: Sender<Option<ResultWithTimings>>,
session_result_receiver: Receiver<Option<ResultWithTimings>>,
session_result_with_timings: Option<ResultWithTimings>,
scheduler_thread: Option<JoinHandle<()>>,
handler_threads: Vec<JoinHandle<()>>,
}
impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
let handler_count = 1;
Self::from_inner(
PooledSchedulerInner::<Self, TH> {
id: pool.new_scheduler_id(),
pool,
thread_manager: ThreadManager::new(pool, handler_count),
},
initial_context,
)
}
}
impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>, handler_count: usize) -> Self {
let (new_task_sender, new_task_receiver) = unbounded();
let (session_result_sender, session_result_receiver) = unbounded();
Self {
scheduler_id: pool.new_scheduler_id(),
pool,
handler_count,
new_task_sender,
new_task_receiver,
session_result_sender,
session_result_receiver,
session_result_with_timings: None,
scheduler_thread: None,
handler_threads: Vec::with_capacity(handler_count),
}
}
fn execute_task_with_handler(
bank: &Arc<Bank>,
executed_task: &mut Box<ExecutedTask>,
handler_context: &HandlerContext,
) {
debug!("handling task at {:?}", thread::current());
TH::handle(
&mut executed_task.result_with_timings.0,
&mut executed_task.result_with_timings.1,
bank,
executed_task.task.transaction(),
executed_task.task.task_index(),
handler_context,
);
}
fn accumulate_result_with_timings(
(result, timings): &mut ResultWithTimings,
executed_task: Box<ExecutedTask>,
) {
match executed_task.result_with_timings.0 {
Ok(()) => {}
Err(error) => {
error!("error is detected while accumulating....: {error:?}");
// Override errors intentionally for simplicity, not retaining the
// first error unlike the block verification in the
// blockstore_processor. This will be addressed with more
// full-fledged impl later.
*result = Err(error);
}
}
timings.accumulate(&executed_task.result_with_timings.1);
}
fn take_session_result_with_timings(&mut self) -> ResultWithTimings {
self.session_result_with_timings.take().unwrap()
}
fn put_session_result_with_timings(&mut self, result_with_timings: ResultWithTimings) {
assert_matches!(
self.session_result_with_timings
.replace(result_with_timings),
None
);
}
fn start_threads(&mut self, context: &SchedulingContext) {
let (mut runnable_task_sender, runnable_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
let (finished_task_sender, finished_task_receiver) = unbounded::<Box<ExecutedTask>>();
let mut result_with_timings = self.session_result_with_timings.take();
// High-level flow of new tasks:
// 1. the replay stage thread send a new task.
// 2. the scheduler thread accepts the task.
// 3. the scheduler thread dispatches the task after proper locking.
// 4. the handler thread processes the dispatched task.
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
let scheduler_main_loop = || {
let handler_count = self.handler_count;
let session_result_sender = self.session_result_sender.clone();
let new_task_receiver = self.new_task_receiver.clone();
let mut session_ending = false;
let mut active_task_count: usize = 0;
// Now, this is the main loop for the scheduler thread, which is a special beast.
//
// That's because it could be the most notable bottleneck of throughput in the future
// when there are ~100 handler threads. Unified scheduler's overall throughput is
// largely dependant on its ultra-low latency characteristic, which is the most
// important design goal of the scheduler in order to reduce the transaction
// confirmation latency for end users.
//
// Firstly, the scheduler thread must handle incoming messages from thread(s) owned by
// the replay stage or the banking stage. It also must handle incoming messages from
// the multi-threaded handlers. This heavily-multi-threaded whole processing load must
// be coped just with the single-threaded scheduler, to attain ideal cpu cache
// friendliness and main memory bandwidth saturation with its shared-nothing
// single-threaded account locking implementation. In other words, the per-task
// processing efficiency of the main loop codifies the upper bound of horizontal
// scalability of the unified scheduler.
//
// Moreover, the scheduler is designed to handle tasks without batching at all in the
// pursuit of saturating all of the handler threads with maximally-fine-grained
// concurrency density for throughput as the second design goal. This design goal
// relies on the assumption that there's no considerable penalty arising from the
// unbatched manner of processing.
//
// Note that this assumption isn't true as of writing. The current code path
// underneath execute_batch() isn't optimized for unified scheduler's load pattern (ie.
// batches just with a single transaction) at all. This will be addressed in the
// future.
//
// These two key elements of the design philosophy lead to the rather unforgiving
// implementation burden: Degraded performance would acutely manifest from an even tiny
// amount of individual cpu-bound processing delay in the scheduler thread, like when
// dispatching the next conflicting task after receiving the previous finished one from
// the handler.
//
// Thus, it's fatal for unified scheduler's advertised superiority to squeeze every cpu
// cycles out of the scheduler thread. Thus, any kinds of unessential overhead sources
// like syscalls, VDSO, and even memory (de)allocation should be avoided at all costs
// by design or by means of offloading at the last resort.
move || loop {
let mut is_finished = false;
while !is_finished {
select! {
recv(finished_task_receiver) -> executed_task => {
let executed_task = executed_task.unwrap();
active_task_count = active_task_count.checked_sub(1).unwrap();
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);
match message.unwrap() {
NewTaskPayload::Payload(task) => {
// so, we're NOT scheduling at all here; rather, just execute
// tx straight off. the inter-tx locking deps aren't needed to
// be resolved in the case of single-threaded FIFO like this.
runnable_task_sender
.send_payload(task)
.unwrap();
active_task_count = active_task_count.checked_add(1).unwrap();
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
assert_matches!(
result_with_timings.replace(initialized_result_with_timings()),
None
);
}
NewTaskPayload::CloseSubchannel => {
session_ending = true;
}
}
},
};
// a really simplistic termination condition, which only works under the
// assumption of single handler thread...
is_finished = session_ending && active_task_count == 0;
}
if session_ending {
session_result_sender
.send(Some(
result_with_timings
.take()
.unwrap_or_else(initialized_result_with_timings),
))
.unwrap();
session_ending = false;
}
}
};
let handler_main_loop = || {
let pool = self.pool.clone();
let mut runnable_task_receiver = runnable_task_receiver.clone();
let finished_task_sender = finished_task_sender.clone();
move || loop {
let (task, sender) = select! {
recv(runnable_task_receiver.for_select()) -> message => {
if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) {
(task, &finished_task_sender)
} else {
continue;
}
},
};
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context().bank(),
&mut task,
&pool.handler_context,
);
sender.send(task).unwrap();
}
};
self.scheduler_thread = Some(
thread::Builder::new()
.name("solScheduler".to_owned())
.spawn(scheduler_main_loop())
.unwrap(),
);
self.handler_threads = (0..self.handler_count)
.map({
|thx| {
thread::Builder::new()
.name(format!("solScHandler{:02}", thx))
.spawn(handler_main_loop())
.unwrap()
}
})
.collect();
}
fn send_task(&self, task: Task) {
debug!("send_task()");
self.new_task_sender
.send(NewTaskPayload::Payload(task))
.unwrap()
}
fn end_session(&mut self) {
if self.session_result_with_timings.is_some() {
debug!("end_session(): already result resides within thread manager..");
return;
}
debug!("end_session(): will end session...");
self.new_task_sender
.send(NewTaskPayload::CloseSubchannel)
.unwrap();
if let Some(result_with_timings) = self.session_result_receiver.recv().unwrap() {
self.put_session_result_with_timings(result_with_timings);
}
}
fn start_session(&mut self, context: &SchedulingContext) {
assert_matches!(self.session_result_with_timings, None);
self.new_task_sender
.send(NewTaskPayload::OpenSubchannel(context.clone()))
.unwrap();
}
}
pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {
type Inner: Debug + Send + Sync;
@ -237,29 +671,33 @@ pub trait SpawnableScheduler<TH: TaskHandler>: InstalledScheduler {
impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
type Inner = PooledSchedulerInner<Self, TH>;
fn into_inner(self) -> (ResultWithTimings, Self::Inner) {
(
self.result_with_timings.into_inner().expect("not poisoned"),
self.inner,
)
fn into_inner(mut self) -> (ResultWithTimings, Self::Inner) {
let result_with_timings = {
let manager = &mut self.inner.thread_manager;
manager.end_session();
manager.take_session_result_with_timings()
};
(result_with_timings, self.inner)
}
fn from_inner(inner: Self::Inner, context: SchedulingContext) -> Self {
Self {
inner,
context,
result_with_timings: Mutex::new((Ok(()), ExecuteTimings::default())),
}
fn from_inner(mut inner: Self::Inner, context: SchedulingContext) -> Self {
inner.thread_manager.start_session(&context);
Self { inner, context }
}
fn spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
Self::do_spawn(pool, initial_context)
let mut scheduler = Self::do_spawn(pool, initial_context);
scheduler
.inner
.thread_manager
.start_threads(&scheduler.context);
scheduler
}
}
impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
fn id(&self) -> SchedulerId {
self.inner.id
self.inner.thread_manager.scheduler_id
}
fn context(&self) -> &SchedulingContext {
@ -267,23 +705,8 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
}
fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) {
let (result, timings) = &mut *self.result_with_timings.lock().expect("not poisoned");
if result.is_err() {
// just bail out early to short-circuit the processing altogether
return;
}
// ... so, we're NOT scheduling at all here; rather, just execute tx straight off. the
// inter-tx locking deps aren't needed to be resolved in the case of single-threaded FIFO
// like this.
TH::handle(
result,
timings,
self.context().bank(),
transaction,
index,
&self.inner.pool.handler_context,
);
let task = Task::create_task(transaction.clone(), index);
self.inner.thread_manager.send_task(task);
}
fn wait_for_termination(
@ -295,7 +718,7 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
}
fn pause_for_recent_blockhash(&mut self) {
// not surprisingly, there's nothing to do for this min impl!
self.inner.thread_manager.end_session();
}
}
@ -305,7 +728,7 @@ where
TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
self.pool.clone().return_scheduler(*self)
self.thread_manager.pool.clone().return_scheduler(*self)
}
}
@ -544,7 +967,8 @@ mod tests {
));
assert_eq!(bank.transaction_count(), 0);
scheduler.schedule_execution(&(bad_tx, 0));
scheduler.pause_for_recent_blockhash();
// simulate the task-sending thread is stalled for some reason.
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(bank.transaction_count(), 0);
let good_tx_after_bad_tx =
@ -563,7 +987,13 @@ mod tests {
scheduler.schedule_execution(&(good_tx_after_bad_tx, 0));
scheduler.pause_for_recent_blockhash();
// transaction_count should remain same as scheduler should be bailing out.
assert_eq!(bank.transaction_count(), 0);
// That's because we're testing the serialized failing execution case in this test.
// However, currently threaded impl can't properly abort in this situtation..
// so, 1 should be observed, intead of 0.
// Also note that bank.transaction_count() is generally racy by nature, because
// blockstore_processor and unified_scheduler both tend to process non-conflicting batches
// in parallel as part of the normal operation.
assert_eq!(bank.transaction_count(), 1);
let bank = BankWithScheduler::new(bank, Some(scheduler));
assert_matches!(
@ -577,8 +1007,10 @@ mod tests {
#[derive(Debug)]
struct AsyncScheduler<const TRIGGER_RACE_CONDITION: bool>(
PooledScheduler<DefaultTaskHandler>,
Mutex<ResultWithTimings>,
Mutex<Vec<JoinHandle<ResultWithTimings>>>,
SchedulingContext,
Arc<SchedulerPool<Self, DefaultTaskHandler>>,
);
impl<const TRIGGER_RACE_CONDITION: bool> AsyncScheduler<TRIGGER_RACE_CONDITION> {
@ -593,7 +1025,7 @@ mod tests {
}
overall_timings.accumulate(&timings);
}
*self.0.result_with_timings.lock().unwrap() = (overall_result, overall_timings);
*self.0.lock().unwrap() = (overall_result, overall_timings);
}
}
@ -601,17 +1033,17 @@ mod tests {
for AsyncScheduler<TRIGGER_RACE_CONDITION>
{
fn id(&self) -> SchedulerId {
self.0.id()
unimplemented!();
}
fn context(&self) -> &SchedulingContext {
self.0.context()
&self.2
}
fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) {
let transaction_and_index = (transaction.clone(), index);
let context = self.context().clone();
let pool = self.0.inner.pool.clone();
let pool = self.3.clone();
self.1.lock().unwrap().push(std::thread::spawn(move || {
// intentionally sleep to simulate race condition where register_recent_blockhash
@ -635,10 +1067,14 @@ mod tests {
fn wait_for_termination(
self: Box<Self>,
is_dropped: bool,
_is_dropped: bool,
) -> (ResultWithTimings, UninstalledSchedulerBox) {
self.do_wait();
Box::new(self.0).wait_for_termination(is_dropped)
let result_with_timings = std::mem::replace(
&mut *self.0.lock().unwrap(),
initialized_result_with_timings(),
);
(result_with_timings, self)
}
fn pause_for_recent_blockhash(&mut self) {
@ -651,6 +1087,14 @@ mod tests {
}
}
impl<const TRIGGER_RACE_CONDITION: bool> UninstalledScheduler
for AsyncScheduler<TRIGGER_RACE_CONDITION>
{
fn return_to_pool(self: Box<Self>) {
self.3.clone().return_scheduler(*self)
}
}
impl<const TRIGGER_RACE_CONDITION: bool> SpawnableScheduler<DefaultTaskHandler>
for AsyncScheduler<TRIGGER_RACE_CONDITION>
{
@ -658,11 +1102,11 @@ mod tests {
type Inner = Self;
fn into_inner(self) -> (ResultWithTimings, Self::Inner) {
todo!();
unimplemented!();
}
fn from_inner(_inner: Self::Inner, _context: SchedulingContext) -> Self {
todo!();
unimplemented!();
}
fn spawn(
@ -670,19 +1114,10 @@ mod tests {
initial_context: SchedulingContext,
) -> Self {
AsyncScheduler::<TRIGGER_RACE_CONDITION>(
PooledScheduler::<DefaultTaskHandler>::from_inner(
PooledSchedulerInner {
id: pool.new_scheduler_id(),
pool: SchedulerPool::new(
pool.handler_context.log_messages_bytes_limit,
pool.handler_context.transaction_status_sender.clone(),
pool.handler_context.replay_vote_sender.clone(),
pool.handler_context.prioritization_fee_cache.clone(),
),
},
initial_context,
),
Mutex::new(initialized_result_with_timings()),
Mutex::new(vec![]),
initial_context,
pool,
)
}
}