diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index a7a9fc3b01..7eb138ed40 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -142,10 +142,12 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - - self.subscriptions - .add_account_subscription(&pubkey, confirmations, &sub_id, &sink) + self.subscriptions.add_account_subscription( + pubkey, + confirmations, + sub_id, + subscriber, + ) } Err(e) => subscriber.reject(e).unwrap(), } @@ -180,10 +182,12 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("program_subscribe: account={:?} id={:?}", pubkey, sub_id); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - - self.subscriptions - .add_program_subscription(&pubkey, confirmations, &sub_id, &sink) + self.subscriptions.add_program_subscription( + pubkey, + confirmations, + sub_id, + subscriber, + ) } Err(e) => subscriber.reject(e).unwrap(), } @@ -222,13 +226,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl { "signature_subscribe: signature={:?} id={:?}", signature, sub_id ); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - self.subscriptions.add_signature_subscription( - &signature, + signature, confirmations, - &sub_id, - &sink, + sub_id, + subscriber, ); } Err(e) => subscriber.reject(e).unwrap(), @@ -257,9 +259,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("slot_subscribe: id={:?}", sub_id); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - - self.subscriptions.add_slot_subscription(&sub_id, &sink); + self.subscriptions.add_slot_subscription(sub_id, subscriber); } fn slot_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 7bc6cb6d47..649c9f6b0d 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -2,7 +2,10 @@ use core::hash::Hash; use jsonrpc_core::futures::Future; -use jsonrpc_pubsub::{typed::Sink, SubscriptionId}; +use jsonrpc_pubsub::{ + typed::{Sink, Subscriber}, + SubscriptionId, +}; use serde::Serialize; use solana_client::rpc_response::{Response, RpcAccount, RpcKeyedAccount, RpcResponseContext}; use solana_ledger::bank_forks::BankForks; @@ -64,27 +67,28 @@ type RpcSlotSubscriptions = RwLock>>; fn add_subscription( subscriptions: &mut HashMap, Confirmations)>>, - hashmap_key: &K, + hashmap_key: K, confirmations: Option, - sub_id: &SubscriptionId, - sink: &Sink, + sub_id: SubscriptionId, + subscriber: Subscriber, ) where - K: Eq + Hash + Clone + Copy, + K: Eq + Hash, S: Clone, { + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let confirmations = confirmations.unwrap_or(0); let confirmations = if confirmations > MAX_LOCKOUT_HISTORY { MAX_LOCKOUT_HISTORY } else { confirmations }; - if let Some(current_hashmap) = subscriptions.get_mut(hashmap_key) { - current_hashmap.insert(sub_id.clone(), (sink.clone(), confirmations)); + if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) { + current_hashmap.insert(sub_id, (sink, confirmations)); return; } let mut hashmap = HashMap::new(); - hashmap.insert(sub_id.clone(), (sink.clone(), confirmations)); - subscriptions.insert(*hashmap_key, hashmap); + hashmap.insert(sub_id, (sink, confirmations)); + subscriptions.insert(hashmap_key, hashmap); } fn remove_subscription( @@ -92,13 +96,13 @@ fn remove_subscription( sub_id: &SubscriptionId, ) -> bool where - K: Eq + Hash + Clone + Copy, + K: Eq + Hash, S: Clone, { let mut found = false; subscriptions.retain(|_, v| { v.retain(|k, _| { - let retain = *k != *sub_id; + let retain = k != sub_id; if !retain { found = true; } @@ -358,13 +362,19 @@ impl RpcSubscriptions { pub fn add_account_subscription( &self, - pubkey: &Pubkey, + pubkey: Pubkey, confirmations: Option, - sub_id: &SubscriptionId, - sink: &Sink>, + sub_id: SubscriptionId, + subscriber: Subscriber>, ) { let mut subscriptions = self.account_subscriptions.write().unwrap(); - add_subscription(&mut subscriptions, pubkey, confirmations, sub_id, sink); + add_subscription( + &mut subscriptions, + pubkey, + confirmations, + sub_id, + subscriber, + ); } pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { @@ -374,13 +384,19 @@ impl RpcSubscriptions { pub fn add_program_subscription( &self, - program_id: &Pubkey, + program_id: Pubkey, confirmations: Option, - sub_id: &SubscriptionId, - sink: &Sink>, + sub_id: SubscriptionId, + subscriber: Subscriber>, ) { let mut subscriptions = self.program_subscriptions.write().unwrap(); - add_subscription(&mut subscriptions, program_id, confirmations, sub_id, sink); + add_subscription( + &mut subscriptions, + program_id, + confirmations, + sub_id, + subscriber, + ); } pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { @@ -390,13 +406,19 @@ impl RpcSubscriptions { pub fn add_signature_subscription( &self, - signature: &Signature, + signature: Signature, confirmations: Option, - sub_id: &SubscriptionId, - sink: &Sink>>, + sub_id: SubscriptionId, + subscriber: Subscriber>>, ) { let mut subscriptions = self.signature_subscriptions.write().unwrap(); - add_subscription(&mut subscriptions, signature, confirmations, sub_id, sink); + add_subscription( + &mut subscriptions, + signature, + confirmations, + sub_id, + subscriber, + ); } pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { @@ -410,9 +432,10 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Bank((current_slot, bank_forks.clone()))); } - pub fn add_slot_subscription(&self, sub_id: &SubscriptionId, sink: &Sink) { + pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.slot_subscriptions.write().unwrap(); - subscriptions.insert(sub_id.clone(), sink.clone()); + subscriptions.insert(sub_id, sink); } pub fn remove_slot_subscription(&self, id: &SubscriptionId) -> bool { @@ -604,10 +627,9 @@ pub(crate) mod tests { let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("accountNotification"); let sub_id = SubscriptionId::Number(0 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new(&exit); - subscriptions.add_account_subscription(&alice.pubkey(), None, &sub_id, &sink); + subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); assert!(subscriptions .account_subscriptions @@ -674,10 +696,14 @@ pub(crate) mod tests { let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("programNotification"); let sub_id = SubscriptionId::Number(0 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new(&exit); - subscriptions.add_program_subscription(&solana_budget_program::id(), None, &sub_id, &sink); + subscriptions.add_program_subscription( + solana_budget_program::id(), + None, + sub_id.clone(), + subscriber, + ); assert!(subscriptions .program_subscriptions @@ -754,40 +780,62 @@ pub(crate) mod tests { let bank_forks = Arc::new(RwLock::new(bank_forks)); - let (subscriber, _id_receiver, transport_receiver) = - Subscriber::new_test("signatureNotification"); - let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new(&exit); + + let (past_bank_sub, _id_receiver, past_bank_recv) = + Subscriber::new_test("signatureNotification"); + let (processed_sub, _id_receiver, processed_recv) = + Subscriber::new_test("signatureNotification"); subscriptions.add_signature_subscription( - &past_bank_tx.signatures[0], + past_bank_tx.signatures[0], Some(0), - &SubscriptionId::Number(1 as u64), - &sink.clone(), + SubscriptionId::Number(1 as u64), + Subscriber::new_test("signatureNotification").0, ); subscriptions.add_signature_subscription( - &processed_tx.signatures[0], - Some(0), - &SubscriptionId::Number(2 as u64), - &sink.clone(), + past_bank_tx.signatures[0], + Some(1), + SubscriptionId::Number(2 as u64), + past_bank_sub, ); subscriptions.add_signature_subscription( - &unprocessed_tx.signatures[0], + processed_tx.signatures[0], Some(0), - &SubscriptionId::Number(3 as u64), - &sink.clone(), + SubscriptionId::Number(3 as u64), + processed_sub, + ); + subscriptions.add_signature_subscription( + unprocessed_tx.signatures[0], + Some(0), + SubscriptionId::Number(4 as u64), + Subscriber::new_test("signatureNotification").0, ); { let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); - assert!(sig_subs.contains_key(&past_bank_tx.signatures[0])); + assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 2); assert!(sig_subs.contains_key(&unprocessed_tx.signatures[0])); assert!(sig_subs.contains_key(&processed_tx.signatures[0])); } subscriptions.notify_subscribers(1, &bank_forks); - let response = robust_poll_or_panic(transport_receiver); let expected_res: Option> = Some(Ok(())); + + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": 0 }, + "value": expected_res, + }, + "subscription": 2, + } + }); + let response = robust_poll_or_panic(past_bank_recv); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + let expected = json!({ "jsonrpc": "2.0", "method": "signatureNotification", @@ -796,9 +844,10 @@ pub(crate) mod tests { "context": { "slot": 1 }, "value": expected_res, }, - "subscription": 0, + "subscription": 3, } }); + let response = robust_poll_or_panic(processed_recv); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); @@ -814,34 +863,6 @@ pub(crate) mod tests { sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(), 1 ); - - let (subscriber, _id_receiver, transport_receiver) = - Subscriber::new_test("signatureNotification"); - let sink = subscriber.assign_id(SubscriptionId::Number(0)).unwrap(); - let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); - - subscriptions.add_signature_subscription( - &past_bank_tx.signatures[0], - Some(1), - &SubscriptionId::Number(1 as u64), - &sink.clone(), - ); - subscriptions.notify_subscribers(1, &bank_forks); - let response = robust_poll_or_panic(transport_receiver); - let expected_res: Option> = Some(Ok(())); - let expected = json!({ - "jsonrpc": "2.0", - "method": "signatureNotification", - "params": { - "result": { - "context": { "slot": 0 }, - "value": expected_res, - }, - "subscription": 0, - } - }); - assert_eq!(serde_json::to_string(&expected).unwrap(), response); } #[test] @@ -849,10 +870,9 @@ pub(crate) mod tests { let (subscriber, _id_receiver, transport_receiver) = Subscriber::new_test("slotNotification"); let sub_id = SubscriptionId::Number(0 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = RpcSubscriptions::new(&exit); - subscriptions.add_slot_subscription(&sub_id, &sink); + subscriptions.add_slot_subscription(sub_id.clone(), subscriber); assert!(subscriptions .slot_subscriptions @@ -885,27 +905,29 @@ pub(crate) mod tests { #[test] fn test_add_and_remove_subscription() { - let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification"); - let sink = subscriber - .assign_id(SubscriptionId::String("test".to_string())) - .unwrap(); let mut subscriptions: HashMap, Confirmations)>> = HashMap::new(); let num_keys = 5; - let mut next_id: u64 = 0; - for _ in 0..num_keys { - let key = next_id; - let sub_id = SubscriptionId::Number(next_id); - add_subscription(&mut subscriptions, &key, None, &sub_id, &sink.clone()); - next_id += 1; + for key in 0..num_keys { + let (subscriber, _id_receiver, _transport_receiver) = + Subscriber::new_test("notification"); + let sub_id = SubscriptionId::Number(key); + add_subscription(&mut subscriptions, key, None, sub_id, subscriber); } // Add another subscription to the "0" key - let sub_id = SubscriptionId::Number(next_id); - add_subscription(&mut subscriptions, &0, None, &sub_id, &sink.clone()); + let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification"); + let extra_sub_id = SubscriptionId::Number(num_keys); + add_subscription( + &mut subscriptions, + 0, + None, + extra_sub_id.clone(), + subscriber, + ); - assert_eq!(subscriptions.len(), num_keys); + assert_eq!(subscriptions.len(), num_keys as usize); assert_eq!(subscriptions.get(&0).unwrap().len(), 2); assert_eq!(subscriptions.get(&1).unwrap().len(), 1); @@ -913,18 +935,15 @@ pub(crate) mod tests { remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)), true ); - assert_eq!(subscriptions.len(), num_keys); + assert_eq!(subscriptions.len(), num_keys as usize); assert_eq!(subscriptions.get(&0).unwrap().len(), 1); assert_eq!( remove_subscription(&mut subscriptions, &SubscriptionId::Number(0)), false ); - assert_eq!( - remove_subscription(&mut subscriptions, &SubscriptionId::Number(next_id)), - true - ); - assert_eq!(subscriptions.len(), num_keys - 1); + assert_eq!(remove_subscription(&mut subscriptions, &extra_sub_id), true); + assert_eq!(subscriptions.len(), (num_keys - 1) as usize); assert!(subscriptions.get(&0).is_none()); } }