Add --unified-scheduler-handler-threads (#35195)

* Add --unified-scheduler-handler-threads

* Adjust value name

* Warn if the flag was ignored

* Tweak message a bit
This commit is contained in:
Ryo Onodera 2024-02-22 09:05:17 +09:00 committed by GitHub
parent 537c3d8e2c
commit 024d6ecc4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 120 additions and 21 deletions

1
Cargo.lock generated
View File

@ -7547,6 +7547,7 @@ dependencies = [
"solana-svm",
"solana-test-validator",
"solana-tpu-client",
"solana-unified-scheduler-pool",
"solana-version",
"solana-vote-program",
"spl-token-2022",

View File

@ -262,6 +262,7 @@ pub struct ValidatorConfig {
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
}
impl Default for ValidatorConfig {
@ -329,6 +330,7 @@ impl Default for ValidatorConfig {
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
}
}
}
@ -813,9 +815,16 @@ impl Validator {
match &config.block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
if let Some(count) = config.unified_scheduler_handler_threads {
warn!(
"--unified-scheduler-handler-threads={count} is ignored because unified \
scheduler isn't enabled"
);
}
}
BlockVerificationMethod::UnifiedScheduler => {
let scheduler_pool = DefaultSchedulerPool::new_dyn(
config.unified_scheduler_handler_threads,
config.runtime_config.log_messages_bytes_limit,
transaction_status_sender.clone(),
Some(replay_vote_sender.clone()),

View File

@ -291,9 +291,17 @@ pub fn load_and_process_ledger(
"Using: block-verification-method: {}",
block_verification_method,
);
let unified_scheduler_handler_threads =
value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok();
match block_verification_method {
BlockVerificationMethod::BlockstoreProcessor => {
info!("no scheduler pool is installed for block verification...");
if let Some(count) = unified_scheduler_handler_threads {
warn!(
"--unified-scheduler-handler-threads={count} is ignored because unified \
scheduler isn't enabled"
);
}
}
BlockVerificationMethod::UnifiedScheduler => {
let no_transaction_status_sender = None;
@ -303,6 +311,7 @@ pub fn load_and_process_ledger(
.write()
.unwrap()
.install_scheduler_pool(DefaultSchedulerPool::new_dyn(
unified_scheduler_handler_threads,
process_options.runtime_config.log_messages_bytes_limit,
no_transaction_status_sender,
no_replay_vote_sender,

View File

@ -28,7 +28,7 @@ use {
input_parsers::{cluster_type_of, pubkey_of, pubkeys_of},
input_validators::{
is_parsable, is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage,
validate_maximum_full_snapshot_archives_to_retain,
is_within_range, validate_maximum_full_snapshot_archives_to_retain,
validate_maximum_incremental_snapshot_archives_to_retain,
},
},
@ -72,6 +72,7 @@ use {
transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader},
},
solana_stake_program::stake_state::{self, PointValue},
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::{
self,
vote_state::{self, VoteState},
@ -852,6 +853,16 @@ fn main() {
.hidden(hidden_unless_forced())
.help(BlockVerificationMethod::cli_message()),
)
.arg(
Arg::with_name("unified_scheduler_handler_threads")
.long("unified-scheduler-handler-threads")
.value_name("COUNT")
.takes_value(true)
.validator(|s| is_within_range(s, 1..))
.global(true)
.hidden(hidden_unless_forced())
.help(DefaultSchedulerPool::cli_message()),
)
.arg(
Arg::with_name("output_format")
.long("output")

View File

@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
generator_config: config.generator_config.clone(),
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
}
}

View File

@ -6546,6 +6546,7 @@ dependencies = [
"solana-svm",
"solana-test-validator",
"solana-tpu-client",
"solana-unified-scheduler-pool",
"solana-version",
"solana-vote-program",
"symlink",

View File

@ -34,7 +34,7 @@ use {
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc, Mutex, Weak,
Arc, Mutex, OnceLock, Weak,
},
thread::{self, JoinHandle},
},
@ -48,6 +48,7 @@ type AtomicSchedulerId = AtomicU64;
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_inners: Mutex<Vec<S::Inner>>,
handler_count: usize,
handler_context: HandlerContext,
// weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to
// Arc<Self> from &Self, because SchedulerPool is used as in the form of Arc<SchedulerPool>
@ -83,13 +84,20 @@ where
// Some internal impl and test code want an actual concrete type, NOT the
// `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`.
fn new(
handler_count: Option<usize>,
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(1);
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later
Arc::new_cyclic(|weak_self| Self {
scheduler_inners: Mutex::default(),
handler_count,
handler_context: HandlerContext {
log_messages_bytes_limit,
transaction_status_sender,
@ -105,12 +113,14 @@ where
// This apparently-meaningless wrapper is handy, because some callers explicitly want
// `dyn InstalledSchedulerPool` to be returned for type inference convenience.
pub fn new_dyn(
handler_count: Option<usize>,
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> InstalledSchedulerPoolArc {
Self::new(
handler_count,
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
@ -145,6 +155,37 @@ where
S::spawn(self.self_arc(), context)
}
}
pub fn default_handler_count() -> usize {
Self::calculate_default_handler_count(
thread::available_parallelism()
.ok()
.map(|non_zero| non_zero.get()),
)
}
pub fn calculate_default_handler_count(detected_cpu_core_count: Option<usize>) -> usize {
// Divide by 4 just not to consume all available CPUs just with handler threads, sparing for
// other active forks and other subsystems.
// Also, if available_parallelism fails (which should be very rare), use 4 threads,
// as a relatively conservatism assumption of modern multi-core systems ranging from
// engineers' laptops to production servers.
detected_cpu_core_count
.map(|core_count| (core_count / 4).max(1))
.unwrap_or(4)
}
pub fn cli_message() -> &'static str {
static MESSAGE: OnceLock<String> = OnceLock::new();
MESSAGE.get_or_init(|| {
format!(
"Change the number of the unified scheduler's transaction execution threads \
dedicated to each block, otherwise calculated as cpu_cores/4 [default: {}]",
Self::default_handler_count()
)
})
}
}
impl<S, TH> InstalledSchedulerPool for SchedulerPool<S, TH>
@ -372,7 +413,6 @@ pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_id: SchedulerId,
pool: Arc<SchedulerPool<S, TH>>,
handler_count: usize,
new_task_sender: Sender<NewTaskPayload>,
new_task_receiver: Receiver<NewTaskPayload>,
session_result_sender: Sender<Option<ResultWithTimings>>,
@ -384,13 +424,9 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
impl<TH: TaskHandler> PooledScheduler<TH> {
fn do_spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
// single-threaded still.
let handler_count = 1;
Self::from_inner(
PooledSchedulerInner::<Self, TH> {
thread_manager: ThreadManager::new(pool, handler_count),
thread_manager: ThreadManager::new(pool),
},
initial_context,
)
@ -398,14 +434,14 @@ impl<TH: TaskHandler> PooledScheduler<TH> {
}
impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>, handler_count: usize) -> Self {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = unbounded();
let (session_result_sender, session_result_receiver) = unbounded();
let handler_count = pool.handler_count;
Self {
scheduler_id: pool.new_scheduler_id(),
pool,
handler_count,
new_task_sender,
new_task_receiver,
session_result_sender,
@ -477,7 +513,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
let scheduler_main_loop = || {
let handler_count = self.handler_count;
let handler_count = self.pool.handler_count;
let session_result_sender = self.session_result_sender.clone();
let new_task_receiver = self.new_task_receiver.clone();
@ -613,7 +649,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.unwrap(),
);
self.handler_threads = (0..self.handler_count)
self.handler_threads = (0..self.pool.handler_count)
.map({
|thx| {
thread::Builder::new()
@ -760,7 +796,7 @@ mod tests {
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
// this indirectly proves that there should be circular link because there's only one Arc
// at this moment now
@ -775,7 +811,7 @@ mod tests {
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = SchedulingContext::new(bank);
let scheduler = pool.take_scheduler(context);
@ -789,7 +825,8 @@ mod tests {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);
@ -817,7 +854,8 @@ mod tests {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Arc::new(Bank::default_for_tests());
let context = &SchedulingContext::new(bank);
let mut scheduler = pool.do_take_scheduler(context.clone());
@ -835,7 +873,8 @@ mod tests {
solana_logger::setup();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
let pool =
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
let old_bank = &Arc::new(Bank::default_for_tests());
let new_bank = &Arc::new(Bank::default_for_tests());
assert!(!Arc::ptr_eq(old_bank, new_bank));
@ -861,7 +900,7 @@ mod tests {
let mut bank_forks = bank_forks.write().unwrap();
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
bank_forks.install_scheduler_pool(pool);
}
@ -875,7 +914,7 @@ mod tests {
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let bank = Bank::default_for_tests();
let bank_forks = BankForks::new_rw_arc(bank);
@ -928,7 +967,7 @@ mod tests {
let bank = setup_dummy_fork_graph(bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());
assert_eq!(bank.transaction_count(), 0);
@ -953,7 +992,7 @@ mod tests {
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let pool =
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
let context = SchedulingContext::new(bank.clone());
let mut scheduler = pool.take_scheduler(context);
@ -1159,6 +1198,7 @@ mod tests {
None,
None,
None,
None,
ignored_prioritization_fee_cache,
);
let scheduler = pool.take_scheduler(context);
@ -1193,4 +1233,18 @@ mod tests {
fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() {
do_test_scheduler_schedule_execution_recent_blockhash_edge_case::<false>();
}
#[test]
fn test_default_handler_count() {
for (detected, expected) in [(32, 8), (4, 1), (2, 1)] {
assert_eq!(
DefaultSchedulerPool::calculate_default_handler_count(Some(detected)),
expected
);
}
assert_eq!(
DefaultSchedulerPool::calculate_default_handler_count(None),
4
);
}
}

View File

@ -61,6 +61,7 @@ solana-streamer = { workspace = true }
solana-svm = { workspace = true }
solana-test-validator = { workspace = true }
solana-tpu-client = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }
solana-vote-program = { workspace = true }
symlink = { workspace = true }

View File

@ -47,6 +47,7 @@ use {
self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE,
},
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_unified_scheduler_pool::DefaultSchedulerPool,
std::{path::PathBuf, str::FromStr},
};
@ -1530,6 +1531,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.possible_values(BlockProductionMethod::cli_names())
.help(BlockProductionMethod::cli_message()),
)
.arg(
Arg::with_name("unified_scheduler_handler_threads")
.long("unified-scheduler-handler-threads")
.hidden(hidden_unless_forced())
.value_name("COUNT")
.takes_value(true)
.validator(|s| is_within_range(s, 1..))
.help(DefaultSchedulerPool::cli_message()),
)
.arg(
Arg::with_name("wen_restart")
.long("wen-restart")

View File

@ -1671,6 +1671,8 @@ pub fn main() {
BlockProductionMethod
)
.unwrap_or_default();
validator_config.unified_scheduler_handler_threads =
value_t!(matches, "unified_scheduler_handler_threads", usize).ok();
validator_config.ledger_column_options = LedgerColumnOptions {
compression_type: match matches.value_of("rocksdb_ledger_compression") {