diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 7072443d36..5420d229e9 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,6 +1,7 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request use crate::rpc_status::RpcSignatureStatus; +use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::typed::Sink; use jsonrpc_pubsub::SubscriptionId; @@ -14,6 +15,46 @@ use std::sync::RwLock; type RpcAccountSubscriptions = RwLock>>>; type RpcSignatureSubscriptions = RwLock>>>; + +fn add_subscription( + subscriptions: &mut HashMap>>, + hashmap_key: &K, + sub_id: &SubscriptionId, + sink: &Sink, +) where + K: Eq + Hash + Clone + Copy, + S: Clone, +{ + if let Some(current_hashmap) = subscriptions.get_mut(hashmap_key) { + current_hashmap.insert(sub_id.clone(), sink.clone()); + return; + } + let mut hashmap = HashMap::new(); + hashmap.insert(sub_id.clone(), sink.clone()); + subscriptions.insert(*hashmap_key, hashmap); +} + +fn remove_subscription( + subscriptions: &mut HashMap>>, + sub_id: &SubscriptionId, +) -> bool +where + K: Eq + Hash + Clone + Copy, + S: Clone, +{ + let mut found = false; + subscriptions.retain(|_, v| { + v.retain(|k, _| { + if *k == *sub_id { + found = true; + } + !found + }); + !v.is_empty() + }); + found +} + pub struct RpcSubscriptions { account_subscriptions: RpcAccountSubscriptions, signature_subscriptions: RpcSignatureSubscriptions, @@ -62,28 +103,12 @@ impl RpcSubscriptions { sink: &Sink, ) { let mut subscriptions = self.account_subscriptions.write().unwrap(); - if let Some(current_hashmap) = subscriptions.get_mut(pubkey) { - current_hashmap.insert(sub_id.clone(), sink.clone()); - return; - } - let mut hashmap = HashMap::new(); - hashmap.insert(sub_id.clone(), sink.clone()); - subscriptions.insert(*pubkey, hashmap); + add_subscription(&mut subscriptions, pubkey, sub_id, sink); } pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.account_subscriptions.write().unwrap(); - let mut found = false; - subscriptions.retain(|_, v| { - v.retain(|k, _| { - if *k == *id { - found = true; - } - !found - }); - !v.is_empty() - }); - found + remove_subscription(&mut subscriptions, id) } pub fn add_signature_subscription( @@ -93,28 +118,12 @@ impl RpcSubscriptions { sink: &Sink, ) { let mut subscriptions = self.signature_subscriptions.write().unwrap(); - if let Some(current_hashmap) = subscriptions.get_mut(signature) { - current_hashmap.insert(sub_id.clone(), sink.clone()); - return; - } - let mut hashmap = HashMap::new(); - hashmap.insert(sub_id.clone(), sink.clone()); - subscriptions.insert(*signature, hashmap); + add_subscription(&mut subscriptions, signature, sub_id, sink); } pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.signature_subscriptions.write().unwrap(); - let mut found = false; - subscriptions.retain(|_, v| { - v.retain(|k, _| { - if *k == *id { - found = true; - } - !found - }); - !v.is_empty() - }); - found + remove_subscription(&mut subscriptions, id) } /// Notify subscribers of changes to any accounts or new signatures since