Move default value for --rpc-pubsub-notification-threads to CLI (#158)

The default value was previously being determined down where the thread
pool is being created. Providing a default value at the CLI level is
consistent with other args, and gives an operator better visibility into
what the default will actually be
This commit is contained in:
steviez 2024-03-12 16:11:44 -05:00 committed by GHA: Update Upstream From Fork
parent 9a8da98577
commit be38278281
6 changed files with 49 additions and 48 deletions

1
Cargo.lock generated
View File

@ -7510,6 +7510,7 @@ dependencies = [
"solana-perf",
"solana-poh",
"solana-program-runtime",
"solana-rayon-threadlimit",
"solana-rpc",
"solana-rpc-client",
"solana-rpc-client-api",

View File

@ -12,10 +12,12 @@ use {
jsonrpc_core::IoHandler,
soketto::handshake::{server, Server},
solana_metrics::TokenCounter,
solana_rayon_threadlimit::get_thread_count,
solana_sdk::timing::AtomicInterval,
std::{
io,
net::SocketAddr,
num::NonZeroUsize,
str,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
@ -43,7 +45,7 @@ pub struct PubSubConfig {
pub queue_capacity_items: usize,
pub queue_capacity_bytes: usize,
pub worker_threads: usize,
pub notification_threads: Option<usize>,
pub notification_threads: Option<NonZeroUsize>,
}
impl Default for PubSubConfig {
@ -55,7 +57,7 @@ impl Default for PubSubConfig {
queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS,
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
worker_threads: DEFAULT_WORKER_THREADS,
notification_threads: None,
notification_threads: NonZeroUsize::new(get_thread_count()),
}
}
}
@ -69,7 +71,7 @@ impl PubSubConfig {
queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS,
queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
worker_threads: DEFAULT_WORKER_THREADS,
notification_threads: Some(2),
notification_threads: NonZeroUsize::new(2),
}
}
}

View File

@ -19,7 +19,6 @@ use {
solana_account_decoder::{parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding},
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_measure::measure::Measure,
solana_rayon_threadlimit::get_thread_count,
solana_rpc_client_api::response::{
ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate,
RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext,
@ -631,41 +630,37 @@ impl RpcSubscriptions {
config.queue_capacity_bytes,
)),
};
let notification_threads = config.notification_threads.unwrap_or_else(get_thread_count);
let t_cleanup = if notification_threads == 0 {
None
} else {
let t_cleanup = config.notification_threads.map(|notification_threads| {
let exit = exit.clone();
Some(
Builder::new()
.name("solRpcNotifier".to_string())
.spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(notification_threads)
.thread_name(|i| format!("solRpcNotify{i:02}"))
.build()
.unwrap();
pool.install(|| {
if let Some(rpc_notifier_ready) = rpc_notifier_ready {
rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
}
Self::process_notifications(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
notifier,
notification_receiver,
subscriptions,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
)
});
})
.unwrap(),
)
};
Builder::new()
.name("solRpcNotifier".to_string())
.spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(notification_threads.get())
.thread_name(|i| format!("solRpcNotify{i:02}"))
.build()
.unwrap();
pool.install(|| {
if let Some(rpc_notifier_ready) = rpc_notifier_ready {
rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
}
Self::process_notifications(
exit,
max_complete_transaction_status_slot,
max_complete_rewards_slot,
blockstore,
notifier,
notification_receiver,
subscriptions,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
)
});
})
.unwrap()
});
let control = SubscriptionControl::new(
config.max_active_subscriptions,
@ -674,11 +669,7 @@ impl RpcSubscriptions {
);
Self {
notification_sender: if notification_threads == 0 {
None
} else {
Some(notification_sender)
},
notification_sender: config.notification_threads.map(|_| notification_sender),
t_cleanup,
exit,
control,

View File

@ -50,6 +50,7 @@ solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-poh = { workspace = true }
solana-program-runtime = { workspace = true }
solana-rayon-threadlimit = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client = { workspace = true }
solana-rpc-client-api = { workspace = true }

View File

@ -26,6 +26,7 @@ use {
solana_faucet::faucet::{self, FAUCET_PORT},
solana_ledger::use_snapshot_archives_at_startup,
solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE},
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{rpc::MAX_REQUEST_BODY_SIZE, rpc_pubsub_service::PubSubConfig},
solana_rpc_client_api::request::MAX_MULTIPLE_ACCOUNTS,
solana_runtime::{
@ -1079,6 +1080,11 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.takes_value(true)
.value_name("NUM_THREADS")
.validator(is_parsable::<usize>)
.default_value_if(
"full_rpc_api",
None,
&default_args.rpc_pubsub_notification_threads,
)
.help(
"The maximum number of threads that RPC PubSub will use for generating \
notifications. 0 will disable RPC PubSub notifications",
@ -2138,6 +2144,7 @@ pub struct DefaultArgs {
pub rpc_bigtable_max_message_size: String,
pub rpc_max_request_body_size: String,
pub rpc_pubsub_worker_threads: String,
pub rpc_pubsub_notification_threads: String,
pub maximum_local_snapshot_age: String,
pub maximum_full_snapshot_archives_to_retain: String,
@ -2225,6 +2232,7 @@ impl DefaultArgs {
rpc_bigtable_max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE
.to_string(),
rpc_pubsub_worker_threads: "4".to_string(),
rpc_pubsub_notification_threads: get_thread_count().to_string(),
maximum_full_snapshot_archives_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN
.to_string(),
maximum_incremental_snapshot_archives_to_retain:

View File

@ -1382,11 +1382,9 @@ pub fn main() {
usize
),
worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize),
notification_threads: if full_api {
value_of(&matches, "rpc_pubsub_notification_threads")
} else {
Some(0)
},
notification_threads: value_t!(matches, "rpc_pubsub_notification_threads", usize)
.ok()
.and_then(NonZeroUsize::new),
},
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),