validator: expose max active pubsub subscriptions to CLI

This commit is contained in:
Trent Nelson 2021-06-16 22:28:23 -06:00 committed by mergify[bot]
parent db37680f6f
commit 5efc48fc69
3 changed files with 42 additions and 7 deletions

View File

@ -27,7 +27,7 @@ use {
}, },
}; };
const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000; pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000;
// Suppress needless_return due to // Suppress needless_return due to
// https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204 // https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204
@ -194,25 +194,35 @@ pub trait RpcSolPubSub {
pub struct RpcSolPubSubImpl { pub struct RpcSolPubSubImpl {
uid: Arc<atomic::AtomicUsize>, uid: Arc<atomic::AtomicUsize>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
max_active_subscriptions: usize,
} }
impl RpcSolPubSubImpl { impl RpcSolPubSubImpl {
pub fn new(subscriptions: Arc<RpcSubscriptions>) -> Self { pub fn new(subscriptions: Arc<RpcSubscriptions>, max_active_subscriptions: usize) -> Self {
let uid = Arc::new(atomic::AtomicUsize::default()); let uid = Arc::new(atomic::AtomicUsize::default());
Self { uid, subscriptions } Self {
uid,
subscriptions,
max_active_subscriptions,
}
} }
#[cfg(test)] #[cfg(test)]
fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self { fn default_with_bank_forks(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let uid = Arc::new(atomic::AtomicUsize::default()); let uid = Arc::new(atomic::AtomicUsize::default());
let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks));
Self { uid, subscriptions } let max_active_subscriptions = MAX_ACTIVE_SUBSCRIPTIONS;
Self {
uid,
subscriptions,
max_active_subscriptions,
}
} }
fn check_subscription_count(&self) -> Result<()> { fn check_subscription_count(&self) -> Result<()> {
let num_subscriptions = self.subscriptions.total(); let num_subscriptions = self.subscriptions.total();
debug!("Total existing subscriptions: {}", num_subscriptions); debug!("Total existing subscriptions: {}", num_subscriptions);
if num_subscriptions >= MAX_ACTIVE_SUBSCRIPTIONS { if num_subscriptions >= self.max_active_subscriptions {
info!("Node subscription limit reached"); info!("Node subscription limit reached");
Err(Error { Err(Error {
code: ErrorCode::InternalError, code: ErrorCode::InternalError,
@ -630,6 +640,7 @@ mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)), )),
uid: Arc::new(atomic::AtomicUsize::default()), uid: Arc::new(atomic::AtomicUsize::default()),
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
}; };
// Test signature subscriptions // Test signature subscriptions
@ -810,6 +821,7 @@ mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)), )),
uid: Arc::new(atomic::AtomicUsize::default()), uid: Arc::new(atomic::AtomicUsize::default()),
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
}; };
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");
@ -920,6 +932,7 @@ mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)), )),
uid: Arc::new(atomic::AtomicUsize::default()), uid: Arc::new(atomic::AtomicUsize::default()),
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
}; };
let session = create_session(); let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification");

View File

@ -2,7 +2,7 @@
use { use {
crate::{ crate::{
rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}, rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl, MAX_ACTIVE_SUBSCRIPTIONS},
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
}, },
jsonrpc_pubsub::{PubSubHandler, Session}, jsonrpc_pubsub::{PubSubHandler, Session},
@ -29,6 +29,7 @@ pub struct PubSubConfig {
pub max_fragment_size: usize, pub max_fragment_size: usize,
pub max_in_buffer_capacity: usize, pub max_in_buffer_capacity: usize,
pub max_out_buffer_capacity: usize, pub max_out_buffer_capacity: usize,
pub max_active_subscriptions: usize,
} }
impl Default for PubSubConfig { impl Default for PubSubConfig {
@ -39,6 +40,7 @@ impl Default for PubSubConfig {
max_fragment_size: 50 * 1024, // 50KB max_fragment_size: 50 * 1024, // 50KB
max_in_buffer_capacity: 50 * 1024, // 50KB max_in_buffer_capacity: 50 * 1024, // 50KB
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
} }
} }
} }
@ -55,7 +57,10 @@ impl PubSubService {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr); info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc = RpcSolPubSubImpl::new(subscriptions.clone()); let rpc = RpcSolPubSubImpl::new(
subscriptions.clone(),
pubsub_config.max_active_subscriptions,
);
let exit_ = exit.clone(); let exit_ = exit.clone();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()

View File

@ -1003,6 +1003,8 @@ pub fn main() {
PubSubConfig::default().max_in_buffer_capacity.to_string(); PubSubConfig::default().max_in_buffer_capacity.to_string();
let default_rpc_pubsub_max_out_buffer_capacity = let default_rpc_pubsub_max_out_buffer_capacity =
PubSubConfig::default().max_out_buffer_capacity.to_string(); PubSubConfig::default().max_out_buffer_capacity.to_string();
let default_rpc_pubsub_max_active_subscriptions =
PubSubConfig::default().max_active_subscriptions.to_string();
let default_rpc_send_transaction_retry_ms = ValidatorConfig::default() let default_rpc_send_transaction_retry_ms = ValidatorConfig::default()
.send_transaction_retry_ms .send_transaction_retry_ms
.to_string(); .to_string();
@ -1610,6 +1612,16 @@ pub fn main() {
.default_value(&default_rpc_pubsub_max_out_buffer_capacity) .default_value(&default_rpc_pubsub_max_out_buffer_capacity)
.help("The maximum size in bytes to which the outgoing websocket buffer can grow."), .help("The maximum size in bytes to which the outgoing websocket buffer can grow."),
) )
.arg(
Arg::with_name("rpc_pubsub_max_active_subscriptions")
.long("rpc-pubsub-max-active-subscriptions")
.takes_value(true)
.value_name("NUMBER")
.validator(is_parsable::<usize>)
.default_value(&default_rpc_pubsub_max_active_subscriptions)
.help("The maximum number of active subscriptions that RPC PubSub will accept \
across all connections."),
)
.arg( .arg(
Arg::with_name("rpc_send_transaction_retry_ms") Arg::with_name("rpc_send_transaction_retry_ms")
.long("rpc-send-retry-ms") .long("rpc-send-retry-ms")
@ -2187,6 +2199,11 @@ pub fn main() {
"rpc_pubsub_max_out_buffer_capacity", "rpc_pubsub_max_out_buffer_capacity",
usize usize
), ),
max_active_subscriptions: value_t_or_exit!(
matches,
"rpc_pubsub_max_active_subscriptions",
usize
),
}, },
voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode,
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),