`--rpc-pubsub-notification-threads 0` now turns off the internal PubSub notification machinery (#25307)

This commit is contained in:
Michael Vines 2022-05-17 20:23:51 -07:00 committed by GitHub
parent 0bdc4cdb7e
commit 795f6eda44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 38 deletions

View File

@ -527,7 +527,7 @@ impl PubsubNotificationStats {
}
pub struct RpcSubscriptions {
notification_sender: Sender<TimestampedNotificationEntry>,
notification_sender: Option<Sender<TimestampedNotificationEntry>>,
t_cleanup: Option<JoinHandle<()>>,
exit: Arc<AtomicBool>,
@ -626,30 +626,36 @@ impl RpcSubscriptions {
config.queue_capacity_bytes,
)),
};
let notification_threads = config.notification_threads;
let t_cleanup = Builder::new()
.name("solana-rpc-notifications".to_string())
.spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(notification_threads.unwrap_or_else(get_thread_count))
.thread_name(|i| format!("sol-sub-notif-{}", i))
.build()
.unwrap();
pool.install(|| {
Self::process_notifications(
exit_clone,
max_complete_transaction_status_slot,
blockstore,
notifier,
notification_receiver,
subscriptions,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
)
});
})
.unwrap();
let notification_threads = config.notification_threads.unwrap_or_else(get_thread_count);
let t_cleanup = if notification_threads == 0 {
None
} else {
Some(
Builder::new()
.name("solana-rpc-notifications".to_string())
.spawn(move || {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(notification_threads)
.thread_name(|i| format!("sol-sub-notif-{}", i))
.build()
.unwrap();
pool.install(|| {
Self::process_notifications(
exit_clone,
max_complete_transaction_status_slot,
blockstore,
notifier,
notification_receiver,
subscriptions,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
)
});
})
.unwrap(),
)
};
let control = SubscriptionControl::new(
config.max_active_subscriptions,
@ -658,9 +664,8 @@ impl RpcSubscriptions {
);
Self {
notification_sender,
t_cleanup: Some(t_cleanup),
notification_sender: Some(notification_sender),
t_cleanup,
exit: exit.clone(),
control,
}
@ -735,13 +740,15 @@ impl RpcSubscriptions {
}
fn enqueue_notification(&self, notification_entry: NotificationEntry) {
match self.notification_sender.send(notification_entry.into()) {
Ok(()) => (),
Err(SendError(notification)) => {
warn!(
"Dropped RPC Notification - receiver disconnected : {:?}",
notification
);
if let Some(ref notification_sender) = self.notification_sender {
match notification_sender.send(notification_entry.into()) {
Ok(()) => (),
Err(SendError(notification)) => {
warn!(
"Dropped RPC Notification - receiver disconnected : {:?}",
notification
);
}
}
}
}

View File

@ -1360,11 +1360,12 @@ pub fn main() {
.arg(
Arg::with_name("rpc_pubsub_notification_threads")
.long("rpc-pubsub-notification-threads")
.requires("full_rpc_api")
.takes_value(true)
.value_name("NUM_THREADS")
.validator(is_parsable::<usize>)
.help("The maximum number of threads that RPC PubSub will use \
for generating notifications."),
for generating notifications. 0 will disable RPC PubSub notifications"),
)
.arg(
Arg::with_name("rpc_send_transaction_retry_ms")
@ -2417,6 +2418,7 @@ pub fn main() {
);
exit(1);
}
let full_api = matches.is_present("full_rpc_api");
let mut validator_config = ValidatorConfig {
require_tower: matches.is_present("require_tower"),
@ -2438,7 +2440,7 @@ pub fn main() {
faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")
}),
full_api: matches.is_present("full_rpc_api"),
full_api,
obsolete_v1_7_api: matches.is_present("obsolete_v1_7_rpc_api"),
max_multiple_accounts: Some(value_t_or_exit!(
matches,
@ -2484,7 +2486,11 @@ pub fn main() {
usize
),
worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize),
notification_threads: value_of(&matches, "rpc_pubsub_notification_threads"),
notification_threads: if full_api {
value_of(&matches, "rpc_pubsub_notification_threads")
} else {
Some(0)
},
},
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),