Use generics for add/remove subscriptions

This commit is contained in:
Tyera Eulberg 2019-03-06 15:31:16 -07:00 committed by Grimes
parent 0badc90058
commit 9fedc9513b
1 changed files with 45 additions and 36 deletions

View File

@ -1,6 +1,7 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc_status::RpcSignatureStatus;
use core::hash::Hash;
use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::typed::Sink;
use jsonrpc_pubsub::SubscriptionId;
@ -14,6 +15,46 @@ use std::sync::RwLock;
type RpcAccountSubscriptions = RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<Account>>>>;
type RpcSignatureSubscriptions =
RwLock<HashMap<Signature, HashMap<SubscriptionId, Sink<RpcSignatureStatus>>>>;
fn add_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, Sink<S>>>,
hashmap_key: &K,
sub_id: &SubscriptionId,
sink: &Sink<S>,
) where
K: Eq + Hash + Clone + Copy,
S: Clone,
{
if let Some(current_hashmap) = subscriptions.get_mut(hashmap_key) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*hashmap_key, hashmap);
}
fn remove_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, Sink<S>>>,
sub_id: &SubscriptionId,
) -> bool
where
K: Eq + Hash + Clone + Copy,
S: Clone,
{
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *sub_id {
found = true;
}
!found
});
!v.is_empty()
});
found
}
pub struct RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions,
signature_subscriptions: RpcSignatureSubscriptions,
@ -62,28 +103,12 @@ impl RpcSubscriptions {
sink: &Sink<Account>,
) {
let mut subscriptions = self.account_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(pubkey) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*pubkey, hashmap);
add_subscription(&mut subscriptions, pubkey, sub_id, sink);
}
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.account_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
remove_subscription(&mut subscriptions, id)
}
pub fn add_signature_subscription(
@ -93,28 +118,12 @@ impl RpcSubscriptions {
sink: &Sink<RpcSignatureStatus>,
) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(signature) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*signature, hashmap);
add_subscription(&mut subscriptions, signature, sub_id, sink);
}
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
remove_subscription(&mut subscriptions, id)
}
/// Notify subscribers of changes to any accounts or new signatures since