Refactor how pubsub subscriptions are added (#9042)

This commit is contained in:
Justin Starry 2020-03-25 00:53:32 +08:00 committed by GitHub
parent 7d6ea6c17e
commit 8f38bc7dc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 110 deletions

View File

@ -142,10 +142,12 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); self.subscriptions.add_account_subscription(
pubkey,
self.subscriptions confirmations,
.add_account_subscription(&pubkey, confirmations, &sub_id, &sink) sub_id,
subscriber,
)
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap(),
} }
@ -180,10 +182,12 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("program_subscribe: account={:?} id={:?}", pubkey, sub_id); info!("program_subscribe: account={:?} id={:?}", pubkey, sub_id);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); self.subscriptions.add_program_subscription(
pubkey,
self.subscriptions confirmations,
.add_program_subscription(&pubkey, confirmations, &sub_id, &sink) sub_id,
subscriber,
)
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap(),
} }
@ -222,13 +226,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
"signature_subscribe: signature={:?} id={:?}", "signature_subscribe: signature={:?} id={:?}",
signature, sub_id signature, sub_id
); );
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
self.subscriptions.add_signature_subscription( self.subscriptions.add_signature_subscription(
&signature, signature,
confirmations, confirmations,
&sub_id, sub_id,
&sink, subscriber,
); );
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap(),
@ -257,9 +259,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("slot_subscribe: id={:?}", sub_id); info!("slot_subscribe: id={:?}", sub_id);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); self.subscriptions.add_slot_subscription(sub_id, subscriber);
self.subscriptions.add_slot_subscription(&sub_id, &sink);
} }
fn slot_unsubscribe(&self, _meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> { fn slot_unsubscribe(&self, _meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {

View File

@ -2,7 +2,10 @@
use core::hash::Hash; use core::hash::Hash;
use jsonrpc_core::futures::Future; use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{typed::Sink, SubscriptionId}; use jsonrpc_pubsub::{
typed::{Sink, Subscriber},
SubscriptionId,
};
use serde::Serialize; use serde::Serialize;
use solana_client::rpc_response::{Response, RpcAccount, RpcKeyedAccount, RpcResponseContext}; use solana_client::rpc_response::{Response, RpcAccount, RpcKeyedAccount, RpcResponseContext};
use solana_ledger::bank_forks::BankForks; use solana_ledger::bank_forks::BankForks;
@ -64,27 +67,28 @@ type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
fn add_subscription<K, S>( fn add_subscription<K, S>(
subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>, subscriptions: &mut HashMap<K, HashMap<SubscriptionId, (Sink<S>, Confirmations)>>,
hashmap_key: &K, hashmap_key: K,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
sub_id: &SubscriptionId, sub_id: SubscriptionId,
sink: &Sink<S>, subscriber: Subscriber<S>,
) where ) where
K: Eq + Hash + Clone + Copy, K: Eq + Hash,
S: Clone, S: Clone,
{ {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let confirmations = confirmations.unwrap_or(0); let confirmations = confirmations.unwrap_or(0);
let confirmations = if confirmations > MAX_LOCKOUT_HISTORY { let confirmations = if confirmations > MAX_LOCKOUT_HISTORY {
MAX_LOCKOUT_HISTORY MAX_LOCKOUT_HISTORY
} else { } else {
confirmations confirmations
}; };
if let Some(current_hashmap) = subscriptions.get_mut(hashmap_key) { if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) {
current_hashmap.insert(sub_id.clone(), (sink.clone(), confirmations)); current_hashmap.insert(sub_id, (sink, confirmations));
return; return;
} }
let mut hashmap = HashMap::new(); let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), (sink.clone(), confirmations)); hashmap.insert(sub_id, (sink, confirmations));
subscriptions.insert(*hashmap_key, hashmap); subscriptions.insert(hashmap_key, hashmap);
} }
fn remove_subscription<K, S>( fn remove_subscription<K, S>(
@ -92,13 +96,13 @@ fn remove_subscription<K, S>(
sub_id: &SubscriptionId, sub_id: &SubscriptionId,
) -> bool ) -> bool
where where
K: Eq + Hash + Clone + Copy, K: Eq + Hash,
S: Clone, S: Clone,
{ {
let mut found = false; let mut found = false;
subscriptions.retain(|_, v| { subscriptions.retain(|_, v| {
v.retain(|k, _| { v.retain(|k, _| {
let retain = *k != *sub_id; let retain = k != sub_id;
if !retain { if !retain {
found = true; found = true;
} }
@ -358,13 +362,19 @@ impl RpcSubscriptions {
pub fn add_account_subscription( pub fn add_account_subscription(
&self, &self,
pubkey: &Pubkey, pubkey: Pubkey,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
sub_id: &SubscriptionId, sub_id: SubscriptionId,
sink: &Sink<Response<RpcAccount>>, subscriber: Subscriber<Response<RpcAccount>>,
) { ) {
let mut subscriptions = self.account_subscriptions.write().unwrap(); let mut subscriptions = self.account_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink); add_subscription(
&mut subscriptions,
pubkey,
confirmations,
sub_id,
subscriber,
);
} }
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
@ -374,13 +384,19 @@ impl RpcSubscriptions {
pub fn add_program_subscription( pub fn add_program_subscription(
&self, &self,
program_id: &Pubkey, program_id: Pubkey,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
sub_id: &SubscriptionId, sub_id: SubscriptionId,
sink: &Sink<Response<RpcKeyedAccount>>, subscriber: Subscriber<Response<RpcKeyedAccount>>,
) { ) {
let mut subscriptions = self.program_subscriptions.write().unwrap(); let mut subscriptions = self.program_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink); add_subscription(
&mut subscriptions,
program_id,
confirmations,
sub_id,
subscriber,
);
} }
pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool {
@ -390,13 +406,19 @@ impl RpcSubscriptions {
pub fn add_signature_subscription( pub fn add_signature_subscription(
&self, &self,
signature: &Signature, signature: Signature,
confirmations: Option<Confirmations>, confirmations: Option<Confirmations>,
sub_id: &SubscriptionId, sub_id: SubscriptionId,
sink: &Sink<Response<transaction::Result<()>>>, subscriber: Subscriber<Response<transaction::Result<()>>>,
) { ) {
let mut subscriptions = self.signature_subscriptions.write().unwrap(); let mut subscriptions = self.signature_subscriptions.write().unwrap();
add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink); add_subscription(
&mut subscriptions,
signature,
confirmations,
sub_id,
subscriber,
);
} }
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
@ -410,9 +432,10 @@ impl RpcSubscriptions {
self.enqueue_notification(NotificationEntry::Bank((current_slot, bank_forks.clone()))); self.enqueue_notification(NotificationEntry::Bank((current_slot, bank_forks.clone())));
} }
pub fn add_slot_subscription(&self, sub_id: &SubscriptionId, sink: &Sink<SlotInfo>) { pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<SlotInfo>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.slot_subscriptions.write().unwrap(); let mut subscriptions = self.slot_subscriptions.write().unwrap();
subscriptions.insert(sub_id.clone(), sink.clone()); subscriptions.insert(sub_id, sink);
} }
pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool { pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool {
@ -604,10 +627,9 @@ pub(crate) mod tests {
let (subscriber, _id_receiver, transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("accountNotification"); Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_account_subscription(&alice.pubkey(), None, &sub_id, &sink); subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber);
assert!(subscriptions assert!(subscriptions
.account_subscriptions .account_subscriptions
@ -674,10 +696,14 @@ pub(crate) mod tests {
let (subscriber, _id_receiver, transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("programNotification"); Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_program_subscription(&solana_budget_program::id(), None, &sub_id, &sink); subscriptions.add_program_subscription(
solana_budget_program::id(),
None,
sub_id.clone(),
subscriber,
);
assert!(subscriptions assert!(subscriptions
.program_subscriptions .program_subscriptions
@ -754,40 +780,62 @@ pub(crate) mod tests {
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("signatureNotification");
let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(&exit);
let (past_bank_sub, _id_receiver, past_bank_recv) =
Subscriber::new_test("signatureNotification");
let (processed_sub, _id_receiver, processed_recv) =
Subscriber::new_test("signatureNotification");
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
&past_bank_tx.signatures[0], past_bank_tx.signatures[0],
Some(0), Some(0),
&SubscriptionId::Number(1 as u64), SubscriptionId::Number(1 as u64),
&sink.clone(), Subscriber::new_test("signatureNotification").0,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
&processed_tx.signatures[0], past_bank_tx.signatures[0],
Some(0), Some(1),
&SubscriptionId::Number(2 as u64), SubscriptionId::Number(2 as u64),
&sink.clone(), past_bank_sub,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
&unprocessed_tx.signatures[0], processed_tx.signatures[0],
Some(0), Some(0),
&SubscriptionId::Number(3 as u64), SubscriptionId::Number(3 as u64),
&sink.clone(), processed_sub,
);
subscriptions.add_signature_subscription(
unprocessed_tx.signatures[0],
Some(0),
SubscriptionId::Number(4 as u64),
Subscriber::new_test("signatureNotification").0,
); );
{ {
let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
assert!(sig_subs.contains_key(&past_bank_tx.signatures[0])); assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 2);
assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0]));
assert!(sig_subs.contains_key(&processed_tx.signatures[0])); assert!(sig_subs.contains_key(&processed_tx.signatures[0]));
} }
subscriptions.notify_subscribers(1, &bank_forks); subscriptions.notify_subscribers(1, &bank_forks);
let response = robust_poll_or_panic(transport_receiver);
let expected_res: Option<transaction::Result<()>> = Some(Ok(())); let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected = json!({
"jsonrpc": "2.0",
"method": "signatureNotification",
"params": {
"result": {
"context": { "slot": 0 },
"value": expected_res,
},
"subscription": 2,
}
});
let response = robust_poll_or_panic(past_bank_recv);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "signatureNotification", "method": "signatureNotification",
@ -796,9 +844,10 @@ pub(crate) mod tests {
"context": { "slot": 1 }, "context": { "slot": 1 },
"value": expected_res, "value": expected_res,
}, },
"subscription": 0, "subscription": 3,
} }
}); });
let response = robust_poll_or_panic(processed_recv);
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); let sig_subs = subscriptions.signature_subscriptions.read().unwrap();
@ -814,34 +863,6 @@ pub(crate) mod tests {
sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(), sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(),
1 1
); );
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("signatureNotification");
let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap();
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_signature_subscription(
&past_bank_tx.signatures[0],
Some(1),
&SubscriptionId::Number(1 as u64),
&sink.clone(),
);
subscriptions.notify_subscribers(1, &bank_forks);
let response = robust_poll_or_panic(transport_receiver);
let expected_res: Option<transaction::Result<()>> = Some(Ok(()));
let expected = json!({
"jsonrpc": "2.0",
"method": "signatureNotification",
"params": {
"result": {
"context": { "slot": 0 },
"value": expected_res,
},
"subscription": 0,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
} }
#[test] #[test]
@ -849,10 +870,9 @@ pub(crate) mod tests {
let (subscriber, _id_receiver, transport_receiver) = let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("slotNotification"); Subscriber::new_test("slotNotification");
let sub_id = SubscriptionId::Number(0 as u64); let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(&exit); let subscriptions = RpcSubscriptions::new(&exit);
subscriptions.add_slot_subscription(&sub_id, &sink); subscriptions.add_slot_subscription(sub_id.clone(), subscriber);
assert!(subscriptions assert!(subscriptions
.slot_subscriptions .slot_subscriptions
@ -885,27 +905,29 @@ pub(crate) mod tests {
#[test] #[test]
fn test_add_and_remove_subscription() { fn test_add_and_remove_subscription() {
let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification");
let sink = subscriber
.assign_id(SubscriptionId::String("test".to_string()))
.unwrap();
let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, (Sink<()>, Confirmations)>> = let mut subscriptions: HashMap<u64, HashMap<SubscriptionId, (Sink<()>, Confirmations)>> =
HashMap::new(); HashMap::new();
let num_keys = 5; let num_keys = 5;
let mut next_id: u64 = 0; for key in 0..num_keys {
for _ in 0..num_keys { let (subscriber, _id_receiver, _transport_receiver) =
let key = next_id; Subscriber::new_test("notification");
let sub_id = SubscriptionId::Number(next_id); let sub_id = SubscriptionId::Number(key);
add_subscription(&mut subscriptions, &key, None, &sub_id, &sink.clone()); add_subscription(&mut subscriptions, key, None, sub_id, subscriber);
next_id += 1;
} }
// Add another subscription to the "0" key // Add another subscription to the "0" key
let sub_id = SubscriptionId::Number(next_id); let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification");
add_subscription(&mut subscriptions, &0, None, &sub_id, &sink.clone()); let extra_sub_id = SubscriptionId::Number(num_keys);
add_subscription(
&mut subscriptions,
0,
None,
extra_sub_id.clone(),
subscriber,
);
assert_eq!(subscriptions.len(), num_keys); assert_eq!(subscriptions.len(), num_keys as usize);
assert_eq!(subscriptions.get(&0).unwrap().len(), 2); assert_eq!(subscriptions.get(&0).unwrap().len(), 2);
assert_eq!(subscriptions.get(&1).unwrap().len(), 1); assert_eq!(subscriptions.get(&1).unwrap().len(), 1);
@ -913,18 +935,15 @@ pub(crate) mod tests {
remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)), remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)),
true true
); );
assert_eq!(subscriptions.len(), num_keys); assert_eq!(subscriptions.len(), num_keys as usize);
assert_eq!(subscriptions.get(&0).unwrap().len(), 1); assert_eq!(subscriptions.get(&0).unwrap().len(), 1);
assert_eq!( assert_eq!(
remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)), remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)),
false false
); );
assert_eq!( assert_eq!(remove_subscription(&mut subscriptions, &extra_sub_id), true);
remove_subscription(&mut subscriptions, &SubscriptionId::Number(next_id)), assert_eq!(subscriptions.len(), (num_keys - 1) as usize);
true
);
assert_eq!(subscriptions.len(), num_keys - 1);
assert!(subscriptions.get(&0).is_none()); assert!(subscriptions.get(&0).is_none());
} }
} }