diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 287d8ee36e..574a1bb832 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -24,6 +24,8 @@ use std::{ sync::{atomic, Arc}, }; +const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000; + // Suppress needless_return due to // https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204 // Once https://github.com/paritytech/jsonrpc/issues/418 is resolved, try to remove this clippy allow @@ -179,6 +181,22 @@ impl RpcSolPubSubImpl { let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); Self { uid, subscriptions } } + + fn check_subscription_count(&self) -> Result<()> { + let num_subscriptions = self.subscriptions.total(); + debug!("Total existing subscriptions: {}", num_subscriptions); + if num_subscriptions >= MAX_ACTIVE_SUBSCRIPTIONS { + info!("Node subscription limit reached"); + Err(Error { + code: ErrorCode::InternalError, + message: "Internal Error: Subscription refused. Node subscription limit reached" + .into(), + data: None, + }) + } else { + Ok(()) + } + } } fn param(param_str: &str, thing: &str) -> Result { @@ -199,6 +217,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { pubkey_str: String, config: Option, ) { + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } match param::(&pubkey_str, "pubkey") { Ok(pubkey) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); @@ -207,7 +229,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { self.subscriptions .add_account_subscription(pubkey, config, sub_id, subscriber) } - Err(e) => subscriber.reject(e).unwrap(), + Err(e) => subscriber.reject(e).unwrap_or_default(), } } @@ -235,6 +257,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { pubkey_str: String, config: Option, ) { + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } match param::(&pubkey_str, "pubkey") { Ok(pubkey) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); @@ -243,7 +269,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { self.subscriptions .add_program_subscription(pubkey, config, sub_id, subscriber) } - Err(e) => subscriber.reject(e).unwrap(), + Err(e) => subscriber.reject(e).unwrap_or_default(), } } @@ -272,6 +298,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { config: Option, ) { info!("logs_subscribe"); + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } let (address, include_votes) = match filter { RpcTransactionLogsFilter::All => (None, false), @@ -281,7 +311,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { 1 => match param::(&addresses[0], "mentions") { Ok(address) => (Some(address), false), Err(e) => { - subscriber.reject(e).unwrap(); + subscriber.reject(e).unwrap_or_default(); return; } }, @@ -294,7 +324,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { message: "Invalid Request: Only 1 address supported".into(), data: None, }) - .unwrap(); + .unwrap_or_default(); return; } } @@ -333,6 +363,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { signature_subscribe_config: Option, ) { info!("signature_subscribe"); + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } match param::(&signature_str, "signature") { Ok(signature) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); @@ -348,7 +382,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { subscriber, ); } - Err(e) => subscriber.reject(e).unwrap(), + Err(e) => subscriber.reject(e).unwrap_or_default(), } } @@ -371,6 +405,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn slot_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { info!("slot_subscribe"); + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("slot_subscribe: id={:?}", sub_id); @@ -392,6 +430,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { info!("vote_subscribe"); + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("vote_subscribe: id={:?}", sub_id); @@ -413,6 +455,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl { fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { info!("root_subscribe"); + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("root_subscribe: id={:?}", sub_id); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index ed6c29c2ce..9329cf8e13 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -371,6 +371,16 @@ fn filter_logs_results( } } +fn total_nested_subscriptions( + subscription_map: &RwLock>>, +) -> usize { + subscription_map + .read() + .unwrap() + .iter() + .fold(0, |acc, x| acc + x.1.len()) +} + #[derive(Clone)] struct Subscriptions { account_subscriptions: Arc, @@ -386,6 +396,24 @@ struct Subscriptions { root_subscriptions: Arc, } +impl Subscriptions { + fn total(&self) -> usize { + let mut total = 0; + total += total_nested_subscriptions(&self.account_subscriptions); + total += total_nested_subscriptions(&self.program_subscriptions); + total += total_nested_subscriptions(&self.logs_subscriptions); + total += total_nested_subscriptions(&self.signature_subscriptions); + total += total_nested_subscriptions(&self.gossip_account_subscriptions); + total += total_nested_subscriptions(&self.gossip_logs_subscriptions); + total += total_nested_subscriptions(&self.gossip_program_subscriptions); + total += total_nested_subscriptions(&self.gossip_signature_subscriptions); + total += self.slot_subscriptions.read().unwrap().len(); + total += self.vote_subscriptions.read().unwrap().len(); + total += self.root_subscriptions.read().unwrap().len(); + total + } +} + pub struct RpcSubscriptions { subscriptions: Subscriptions, notification_sender: Arc>>, @@ -594,6 +622,10 @@ impl RpcSubscriptions { notified_ids } + pub fn total(&self) -> usize { + self.subscriptions.total() + } + pub fn add_account_subscription( &self, pubkey: Pubkey, @@ -2073,4 +2105,150 @@ pub(crate) mod tests { .unwrap() .contains_key(&alice.pubkey())); } + + #[test] + fn test_total_nested_subscriptions() { + let mock_subscriptions = RwLock::new(HashMap::new()); + assert_eq!(total_nested_subscriptions(&mock_subscriptions), 0); + + mock_subscriptions + .write() + .unwrap() + .insert(0, HashMap::new()); + assert_eq!(total_nested_subscriptions(&mock_subscriptions), 0); + + mock_subscriptions + .write() + .unwrap() + .entry(0) + .and_modify(|map| { + map.insert(0, "test"); + }); + assert_eq!(total_nested_subscriptions(&mock_subscriptions), 1); + + mock_subscriptions + .write() + .unwrap() + .entry(0) + .and_modify(|map| { + map.insert(1, "test"); + }); + assert_eq!(total_nested_subscriptions(&mock_subscriptions), 2); + + mock_subscriptions + .write() + .unwrap() + .insert(1, HashMap::new()); + assert_eq!(total_nested_subscriptions(&mock_subscriptions), 2); + + mock_subscriptions + .write() + .unwrap() + .entry(1) + .and_modify(|map| { + map.insert(0, "test"); + }); + assert_eq!(total_nested_subscriptions(&mock_subscriptions), 3); + } + + #[test] + fn test_total_subscriptions() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let subscriptions = RpcSubscriptions::default_with_bank_forks(bank_forks); + + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("accountNotification"); + let account_sub_id = SubscriptionId::Number(0u64); + subscriptions.add_account_subscription( + Pubkey::default(), + None, + account_sub_id.clone(), + subscriber, + ); + assert_eq!(subscriptions.total(), 1); + + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("programNotification"); + let program_sub_id = SubscriptionId::Number(1u64); + subscriptions.add_program_subscription( + Pubkey::default(), + None, + program_sub_id.clone(), + subscriber, + ); + assert_eq!(subscriptions.total(), 2); + + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("logsNotification"); + let logs_sub_id = SubscriptionId::Number(2u64); + subscriptions.add_logs_subscription(None, false, None, logs_sub_id.clone(), subscriber); + assert_eq!(subscriptions.total(), 3); + + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("signatureNotification"); + let sig_sub_id = SubscriptionId::Number(3u64); + subscriptions.add_signature_subscription( + Signature::default(), + None, + sig_sub_id.clone(), + subscriber, + ); + assert_eq!(subscriptions.total(), 4); + + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("slotNotification"); + let slot_sub_id = SubscriptionId::Number(4u64); + subscriptions.add_slot_subscription(slot_sub_id.clone(), subscriber); + assert_eq!(subscriptions.total(), 5); + + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("voteNotification"); + let vote_sub_id = SubscriptionId::Number(5u64); + subscriptions.add_vote_subscription(vote_sub_id.clone(), subscriber); + assert_eq!(subscriptions.total(), 6); + + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("rootNotification"); + let root_sub_id = SubscriptionId::Number(6u64); + subscriptions.add_root_subscription(root_sub_id.clone(), subscriber); + assert_eq!(subscriptions.total(), 7); + + // Add duplicate account subscription to ensure totals include all subscriptions on all keys + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("accountNotification2"); + let account_dupe_sub_id = SubscriptionId::Number(7u64); + subscriptions.add_account_subscription( + Pubkey::default(), + None, + account_dupe_sub_id.clone(), + subscriber, + ); + assert_eq!(subscriptions.total(), 8); + + subscriptions.remove_account_subscription(&account_sub_id); + assert_eq!(subscriptions.total(), 7); + + subscriptions.remove_account_subscription(&account_dupe_sub_id); + assert_eq!(subscriptions.total(), 6); + + subscriptions.remove_program_subscription(&program_sub_id); + assert_eq!(subscriptions.total(), 5); + + subscriptions.remove_logs_subscription(&logs_sub_id); + assert_eq!(subscriptions.total(), 4); + + subscriptions.remove_signature_subscription(&sig_sub_id); + assert_eq!(subscriptions.total(), 3); + + subscriptions.remove_slot_subscription(&slot_sub_id); + assert_eq!(subscriptions.total(), 2); + + subscriptions.remove_vote_subscription(&vote_sub_id); + assert_eq!(subscriptions.total(), 1); + + subscriptions.remove_root_subscription(&root_sub_id); + assert_eq!(subscriptions.total(), 0); + } }