Add subscription cap (#14011)

* Add subscription cap

* Elide unwraps
This commit is contained in:
Tyera Eulberg 2020-12-11 18:57:40 -07:00 committed by GitHub
parent 1792100e2b
commit 8541ffa328
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 229 additions and 5 deletions

View File

@ -24,6 +24,8 @@ use std::{
sync::{atomic, Arc}, sync::{atomic, Arc},
}; };
const MAX_ACTIVE_SUBSCRIPTIONS: usize = 100_000;
// Suppress needless_return due to // Suppress needless_return due to
// https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204 // https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204
// Once https://github.com/paritytech/jsonrpc/issues/418 is resolved, try to remove this clippy allow // Once https://github.com/paritytech/jsonrpc/issues/418 is resolved, try to remove this clippy allow
@ -179,6 +181,22 @@ impl RpcSolPubSubImpl {
let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks));
Self { uid, subscriptions } Self { uid, subscriptions }
} }
fn check_subscription_count(&self) -> Result<()> {
let num_subscriptions = self.subscriptions.total();
debug!("Total existing subscriptions: {}", num_subscriptions);
if num_subscriptions >= MAX_ACTIVE_SUBSCRIPTIONS {
info!("Node subscription limit reached");
Err(Error {
code: ErrorCode::InternalError,
message: "Internal Error: Subscription refused. Node subscription limit reached"
.into(),
data: None,
})
} else {
Ok(())
}
}
} }
fn param<T: FromStr>(param_str: &str, thing: &str) -> Result<T> { fn param<T: FromStr>(param_str: &str, thing: &str) -> Result<T> {
@ -199,6 +217,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
pubkey_str: String, pubkey_str: String,
config: Option<RpcAccountInfoConfig>, config: Option<RpcAccountInfoConfig>,
) { ) {
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
match param::<Pubkey>(&pubkey_str, "pubkey") { match param::<Pubkey>(&pubkey_str, "pubkey") {
Ok(pubkey) => { Ok(pubkey) => {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
@ -207,7 +229,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
self.subscriptions self.subscriptions
.add_account_subscription(pubkey, config, sub_id, subscriber) .add_account_subscription(pubkey, config, sub_id, subscriber)
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap_or_default(),
} }
} }
@ -235,6 +257,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
pubkey_str: String, pubkey_str: String,
config: Option<RpcProgramAccountsConfig>, config: Option<RpcProgramAccountsConfig>,
) { ) {
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
match param::<Pubkey>(&pubkey_str, "pubkey") { match param::<Pubkey>(&pubkey_str, "pubkey") {
Ok(pubkey) => { Ok(pubkey) => {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
@ -243,7 +269,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
self.subscriptions self.subscriptions
.add_program_subscription(pubkey, config, sub_id, subscriber) .add_program_subscription(pubkey, config, sub_id, subscriber)
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap_or_default(),
} }
} }
@ -272,6 +298,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
config: Option<RpcTransactionLogsConfig>, config: Option<RpcTransactionLogsConfig>,
) { ) {
info!("logs_subscribe"); info!("logs_subscribe");
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
let (address, include_votes) = match filter { let (address, include_votes) = match filter {
RpcTransactionLogsFilter::All => (None, false), RpcTransactionLogsFilter::All => (None, false),
@ -281,7 +311,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
1 => match param::<Pubkey>(&addresses[0], "mentions") { 1 => match param::<Pubkey>(&addresses[0], "mentions") {
Ok(address) => (Some(address), false), Ok(address) => (Some(address), false),
Err(e) => { Err(e) => {
subscriber.reject(e).unwrap(); subscriber.reject(e).unwrap_or_default();
return; return;
} }
}, },
@ -294,7 +324,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
message: "Invalid Request: Only 1 address supported".into(), message: "Invalid Request: Only 1 address supported".into(),
data: None, data: None,
}) })
.unwrap(); .unwrap_or_default();
return; return;
} }
} }
@ -333,6 +363,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
signature_subscribe_config: Option<RpcSignatureSubscribeConfig>, signature_subscribe_config: Option<RpcSignatureSubscribeConfig>,
) { ) {
info!("signature_subscribe"); info!("signature_subscribe");
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
match param::<Signature>(&signature_str, "signature") { match param::<Signature>(&signature_str, "signature") {
Ok(signature) => { Ok(signature) => {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
@ -348,7 +382,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
subscriber, subscriber,
); );
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap_or_default(),
} }
} }
@ -371,6 +405,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
fn slot_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) { fn slot_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
info!("slot_subscribe"); info!("slot_subscribe");
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("slot_subscribe: id={:?}", sub_id); info!("slot_subscribe: id={:?}", sub_id);
@ -392,6 +430,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<RpcVote>) { fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<RpcVote>) {
info!("vote_subscribe"); info!("vote_subscribe");
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("vote_subscribe: id={:?}", sub_id); info!("vote_subscribe: id={:?}", sub_id);
@ -413,6 +455,10 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<Slot>) { fn root_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<Slot>) {
info!("root_subscribe"); info!("root_subscribe");
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("root_subscribe: id={:?}", sub_id); info!("root_subscribe: id={:?}", sub_id);

View File

@ -371,6 +371,16 @@ fn filter_logs_results(
} }
} }
fn total_nested_subscriptions<K, L, V>(
subscription_map: &RwLock<HashMap<K, HashMap<L, V>>>,
) -> usize {
subscription_map
.read()
.unwrap()
.iter()
.fold(0, |acc, x| acc + x.1.len())
}
#[derive(Clone)] #[derive(Clone)]
struct Subscriptions { struct Subscriptions {
account_subscriptions: Arc<RpcAccountSubscriptions>, account_subscriptions: Arc<RpcAccountSubscriptions>,
@ -386,6 +396,24 @@ struct Subscriptions {
root_subscriptions: Arc<RpcRootSubscriptions>, root_subscriptions: Arc<RpcRootSubscriptions>,
} }
impl Subscriptions {
fn total(&self) -> usize {
let mut total = 0;
total += total_nested_subscriptions(&self.account_subscriptions);
total += total_nested_subscriptions(&self.program_subscriptions);
total += total_nested_subscriptions(&self.logs_subscriptions);
total += total_nested_subscriptions(&self.signature_subscriptions);
total += total_nested_subscriptions(&self.gossip_account_subscriptions);
total += total_nested_subscriptions(&self.gossip_logs_subscriptions);
total += total_nested_subscriptions(&self.gossip_program_subscriptions);
total += total_nested_subscriptions(&self.gossip_signature_subscriptions);
total += self.slot_subscriptions.read().unwrap().len();
total += self.vote_subscriptions.read().unwrap().len();
total += self.root_subscriptions.read().unwrap().len();
total
}
}
pub struct RpcSubscriptions { pub struct RpcSubscriptions {
subscriptions: Subscriptions, subscriptions: Subscriptions,
notification_sender: Arc<Mutex<Sender<NotificationEntry>>>, notification_sender: Arc<Mutex<Sender<NotificationEntry>>>,
@ -594,6 +622,10 @@ impl RpcSubscriptions {
notified_ids notified_ids
} }
pub fn total(&self) -> usize {
self.subscriptions.total()
}
pub fn add_account_subscription( pub fn add_account_subscription(
&self, &self,
pubkey: Pubkey, pubkey: Pubkey,
@ -2073,4 +2105,150 @@ pub(crate) mod tests {
.unwrap() .unwrap()
.contains_key(&alice.pubkey())); .contains_key(&alice.pubkey()));
} }
#[test]
fn test_total_nested_subscriptions() {
let mock_subscriptions = RwLock::new(HashMap::new());
assert_eq!(total_nested_subscriptions(&mock_subscriptions), 0);
mock_subscriptions
.write()
.unwrap()
.insert(0, HashMap::new());
assert_eq!(total_nested_subscriptions(&mock_subscriptions), 0);
mock_subscriptions
.write()
.unwrap()
.entry(0)
.and_modify(|map| {
map.insert(0, "test");
});
assert_eq!(total_nested_subscriptions(&mock_subscriptions), 1);
mock_subscriptions
.write()
.unwrap()
.entry(0)
.and_modify(|map| {
map.insert(1, "test");
});
assert_eq!(total_nested_subscriptions(&mock_subscriptions), 2);
mock_subscriptions
.write()
.unwrap()
.insert(1, HashMap::new());
assert_eq!(total_nested_subscriptions(&mock_subscriptions), 2);
mock_subscriptions
.write()
.unwrap()
.entry(1)
.and_modify(|map| {
map.insert(0, "test");
});
assert_eq!(total_nested_subscriptions(&mock_subscriptions), 3);
}
#[test]
fn test_total_subscriptions() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let subscriptions = RpcSubscriptions::default_with_bank_forks(bank_forks);
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("accountNotification");
let account_sub_id = SubscriptionId::Number(0u64);
subscriptions.add_account_subscription(
Pubkey::default(),
None,
account_sub_id.clone(),
subscriber,
);
assert_eq!(subscriptions.total(), 1);
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("programNotification");
let program_sub_id = SubscriptionId::Number(1u64);
subscriptions.add_program_subscription(
Pubkey::default(),
None,
program_sub_id.clone(),
subscriber,
);
assert_eq!(subscriptions.total(), 2);
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("logsNotification");
let logs_sub_id = SubscriptionId::Number(2u64);
subscriptions.add_logs_subscription(None, false, None, logs_sub_id.clone(), subscriber);
assert_eq!(subscriptions.total(), 3);
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("signatureNotification");
let sig_sub_id = SubscriptionId::Number(3u64);
subscriptions.add_signature_subscription(
Signature::default(),
None,
sig_sub_id.clone(),
subscriber,
);
assert_eq!(subscriptions.total(), 4);
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("slotNotification");
let slot_sub_id = SubscriptionId::Number(4u64);
subscriptions.add_slot_subscription(slot_sub_id.clone(), subscriber);
assert_eq!(subscriptions.total(), 5);
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("voteNotification");
let vote_sub_id = SubscriptionId::Number(5u64);
subscriptions.add_vote_subscription(vote_sub_id.clone(), subscriber);
assert_eq!(subscriptions.total(), 6);
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("rootNotification");
let root_sub_id = SubscriptionId::Number(6u64);
subscriptions.add_root_subscription(root_sub_id.clone(), subscriber);
assert_eq!(subscriptions.total(), 7);
// Add duplicate account subscription to ensure totals include all subscriptions on all keys
let (subscriber, _id_receiver, _transport_receiver) =
Subscriber::new_test("accountNotification2");
let account_dupe_sub_id = SubscriptionId::Number(7u64);
subscriptions.add_account_subscription(
Pubkey::default(),
None,
account_dupe_sub_id.clone(),
subscriber,
);
assert_eq!(subscriptions.total(), 8);
subscriptions.remove_account_subscription(&account_sub_id);
assert_eq!(subscriptions.total(), 7);
subscriptions.remove_account_subscription(&account_dupe_sub_id);
assert_eq!(subscriptions.total(), 6);
subscriptions.remove_program_subscription(&program_sub_id);
assert_eq!(subscriptions.total(), 5);
subscriptions.remove_logs_subscription(&logs_sub_id);
assert_eq!(subscriptions.total(), 4);
subscriptions.remove_signature_subscription(&sig_sub_id);
assert_eq!(subscriptions.total(), 3);
subscriptions.remove_slot_subscription(&slot_sub_id);
assert_eq!(subscriptions.total(), 2);
subscriptions.remove_vote_subscription(&vote_sub_id);
assert_eq!(subscriptions.total(), 1);
subscriptions.remove_root_subscription(&root_sub_id);
assert_eq!(subscriptions.total(), 0);
}
} }