From 6a81f9e443dd0bf9b1d62a1c2be6357f35e7fd12 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 6 Mar 2019 15:31:58 -0700 Subject: [PATCH] Add program subscriptions to rpc --- core/src/rpc_pubsub.rs | 62 ++++++++++++++++++++++++ core/src/rpc_subscriptions.rs | 90 +++++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 204a764b4f..7ff26f604e 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -34,6 +34,23 @@ pub trait RpcSolPubSub { )] fn account_unsubscribe(&self, _: Option, _: SubscriptionId) -> Result; + // Get notification every time account userdata owned by a particular program is changed + // Accepts pubkey parameter as base-58 encoded string + #[pubsub( + subscription = "programNotification", + subscribe, + name = "programSubscribe" + )] + fn program_subscribe(&self, _: Self::Metadata, _: Subscriber<(String, Account)>, _: String); + + // Unsubscribe from account notification subscription. + #[pubsub( + subscription = "programNotification", + unsubscribe, + name = "programUnsubscribe" + )] + fn program_unsubscribe(&self, _: Option, _: SubscriptionId) -> Result; + // Get notification when signature is verified // Accepts signature parameter as base-58 encoded string #[pubsub( @@ -113,6 +130,51 @@ impl RpcSolPubSub for RpcSolPubSubImpl { } } + fn program_subscribe( + &self, + _meta: Self::Metadata, + subscriber: Subscriber<(String, Account)>, + pubkey_str: String, + ) { + let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap(); + if pubkey_vec.len() != mem::size_of::() { + subscriber + .reject(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Invalid pubkey provided".into(), + data: None, + }) + .unwrap(); + return; + } + let pubkey = Pubkey::new(&pubkey_vec); + + let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); + let sub_id = SubscriptionId::Number(id as u64); + info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + + self.subscriptions + .add_program_subscription(&pubkey, &sub_id, &sink) + } + + fn program_unsubscribe( + &self, + _meta: Option, + id: SubscriptionId, + ) -> Result { + info!("account_unsubscribe: id={:?}", id); + if self.subscriptions.remove_program_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + fn signature_subscribe( &self, _meta: Self::Metadata, diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 5420d229e9..348d77e87b 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,6 +1,7 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request use crate::rpc_status::RpcSignatureStatus; +use bs58; use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::typed::Sink; @@ -13,6 +14,8 @@ use std::collections::HashMap; use std::sync::RwLock; type RpcAccountSubscriptions = RwLock>>>; +type RpcProgramSubscriptions = + RwLock>>>; type RpcSignatureSubscriptions = RwLock>>>; @@ -57,6 +60,7 @@ where pub struct RpcSubscriptions { account_subscriptions: RpcAccountSubscriptions, + program_subscriptions: RpcProgramSubscriptions, signature_subscriptions: RpcSignatureSubscriptions, } @@ -64,6 +68,7 @@ impl Default for RpcSubscriptions { fn default() -> Self { RpcSubscriptions { account_subscriptions: RpcAccountSubscriptions::default(), + program_subscriptions: RpcProgramSubscriptions::default(), signature_subscriptions: RpcSignatureSubscriptions::default(), } } @@ -79,6 +84,17 @@ impl RpcSubscriptions { } } + pub fn check_program(&self, program_id: &Pubkey, pubkey: &Pubkey, account: &Account) { + let subscriptions = self.program_subscriptions.write().unwrap(); + if let Some(hashmap) = subscriptions.get(program_id) { + for (_bank_sub_id, sink) in hashmap.iter() { + sink.notify(Ok((bs58::encode(pubkey).into_string(), account.clone()))) + .wait() + .unwrap(); + } + } + } + pub fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) { let status = match bank_error { Ok(_) => RpcSignatureStatus::Confirmed, @@ -111,6 +127,21 @@ impl RpcSubscriptions { remove_subscription(&mut subscriptions, id) } + pub fn add_program_subscription( + &self, + program_id: &Pubkey, + sub_id: &SubscriptionId, + sink: &Sink<(String, Account)>, + ) { + let mut subscriptions = self.program_subscriptions.write().unwrap(); + add_subscription(&mut subscriptions, program_id, sub_id, sink); + } + + pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.program_subscriptions.write().unwrap(); + remove_subscription(&mut subscriptions, id) + } + pub fn add_signature_subscription( &self, signature: &Signature, @@ -139,6 +170,19 @@ impl RpcSubscriptions { } } + let programs: Vec<_> = { + let subs = self.program_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for program_id in &programs { + let accounts = &bank.get_program_accounts_modified_since_parent(program_id); + if !accounts.is_empty() { + for (pubkey, account) in accounts.iter() { + self.check_program(program_id, pubkey, account); + } + } + } + let signatures: Vec<_> = { let subs = self.signature_subscriptions.read().unwrap(); subs.keys().cloned().collect() @@ -205,6 +249,52 @@ mod tests { .unwrap() .contains_key(&alice.pubkey())); } + + #[test] + fn test_check_program_subscribe() { + let (genesis_block, mint_keypair) = GenesisBlock::new(100); + let bank = Bank::new(&genesis_block); + let alice = Keypair::new(); + let blockhash = bank.last_blockhash(); + let tx = SystemTransaction::new_program_account( + &mint_keypair, + alice.pubkey(), + blockhash, + 1, + 16, + solana_budget_api::id(), + 0, + ); + bank.process_transaction(&tx).unwrap(); + + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("programNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let subscriptions = RpcSubscriptions::default(); + subscriptions.add_program_subscription(&solana_budget_api::id(), &sub_id, &sink); + + assert!(subscriptions + .program_subscriptions + .read() + .unwrap() + .contains_key(&solana_budget_api::id())); + + let account = bank.get_account(&alice.pubkey()).unwrap(); + subscriptions.check_program(&solana_budget_api::id(), &alice.pubkey(), &account); + let string = transport_receiver.poll(); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"programNotification","params":{{"result":["{:?}",{{"executable":false,"lamports":1,"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}],"subscription":0}}}}"#, alice.pubkey()); + assert_eq!(expected, response); + } + + subscriptions.remove_program_subscription(&sub_id); + assert!(!subscriptions + .program_subscriptions + .read() + .unwrap() + .contains_key(&solana_budget_api::id())); + } #[test] fn test_check_signature_subscribe() { let (genesis_block, mint_keypair) = GenesisBlock::new(100);