//! The `pubsub` module implements a threaded subscription service on client RPC request use crate::rpc_subscriptions::{Confirmations, RpcSubscriptions, SlotInfo}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use solana_client::rpc_response::{Response as RpcResponse, RpcAccount, RpcKeyedAccount}; use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction}; use std::sync::{atomic, Arc}; // Suppress needless_return due to // https://github.com/paritytech/jsonrpc/blob/2d38e6424d8461cdf72e78425ce67d51af9c6586/derive/src/lib.rs#L204 // Once https://github.com/paritytech/jsonrpc/issues/418 is resolved, try to remove this clippy allow #[allow(clippy::needless_return)] #[rpc] pub trait RpcSolPubSub { type Metadata; // Get notification every time account data is changed // Accepts pubkey parameter as base-58 encoded string #[pubsub( subscription = "accountNotification", subscribe, name = "accountSubscribe" )] fn account_subscribe( &self, meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ); // Unsubscribe from account notification subscription. #[pubsub( subscription = "accountNotification", unsubscribe, name = "accountUnsubscribe" )] fn account_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; // Get notification every time account data owned by a particular program is changed // Accepts pubkey parameter as base-58 encoded string #[pubsub( subscription = "programNotification", subscribe, name = "programSubscribe" )] fn program_subscribe( &self, meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ); // Unsubscribe from account notification subscription. #[pubsub( subscription = "programNotification", unsubscribe, name = "programUnsubscribe" )] fn program_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; // Get notification when signature is verified // Accepts signature parameter as base-58 encoded string #[pubsub( subscription = "signatureNotification", subscribe, name = "signatureSubscribe" )] fn signature_subscribe( &self, meta: Self::Metadata, subscriber: Subscriber>>, signature_str: String, confirmations: Option, ); // Unsubscribe from signature notification subscription. #[pubsub( subscription = "signatureNotification", unsubscribe, name = "signatureUnsubscribe" )] fn signature_unsubscribe( &self, meta: Option, id: SubscriptionId, ) -> Result; // Get notification when slot is encountered #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")] fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); // Unsubscribe from slot notification subscription. #[pubsub( subscription = "slotNotification", unsubscribe, name = "slotUnsubscribe" )] fn slot_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; } #[derive(Default)] pub struct RpcSolPubSubImpl { uid: Arc, subscriptions: Arc, } impl RpcSolPubSubImpl { pub fn new(subscriptions: Arc) -> Self { let uid = Arc::new(atomic::AtomicUsize::default()); Self { uid, subscriptions } } } use std::str::FromStr; fn param(param_str: &str, thing: &str) -> Result { param_str.parse::().map_err(|_e| Error { code: ErrorCode::InvalidParams, message: format!("Invalid Request: Invalid {} provided", thing), data: None, }) } impl RpcSolPubSub for RpcSolPubSubImpl { type Metadata = Arc; fn account_subscribe( &self, _meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ) { match param::(&pubkey_str, "pubkey") { Ok(pubkey) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); self.subscriptions.add_account_subscription( pubkey, confirmations, sub_id, subscriber, ) } Err(e) => subscriber.reject(e).unwrap(), } } fn account_unsubscribe( &self, _meta: Option, id: SubscriptionId, ) -> Result { info!("account_unsubscribe: id={:?}", id); if self.subscriptions.remove_account_subscription(&id) { Ok(true) } else { Err(Error { code: ErrorCode::InvalidParams, message: "Invalid Request: Subscription id does not exist".into(), data: None, }) } } fn program_subscribe( &self, _meta: Self::Metadata, subscriber: Subscriber>, pubkey_str: String, confirmations: Option, ) { match param::(&pubkey_str, "pubkey") { Ok(pubkey) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("program_subscribe: account={:?} id={:?}", pubkey, sub_id); self.subscriptions.add_program_subscription( pubkey, confirmations, sub_id, subscriber, ) } Err(e) => subscriber.reject(e).unwrap(), } } fn program_unsubscribe( &self, _meta: Option, id: SubscriptionId, ) -> Result { info!("program_unsubscribe: id={:?}", id); if self.subscriptions.remove_program_subscription(&id) { Ok(true) } else { Err(Error { code: ErrorCode::InvalidParams, message: "Invalid Request: Subscription id does not exist".into(), data: None, }) } } fn signature_subscribe( &self, _meta: Self::Metadata, subscriber: Subscriber>>, signature_str: String, confirmations: Option, ) { info!("signature_subscribe"); match param::(&signature_str, "signature") { Ok(signature) => { let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!( "signature_subscribe: signature={:?} id={:?}", signature, sub_id ); self.subscriptions.add_signature_subscription( signature, confirmations, sub_id, subscriber, ); } Err(e) => subscriber.reject(e).unwrap(), } } fn signature_unsubscribe( &self, _meta: Option, id: SubscriptionId, ) -> Result { info!("signature_unsubscribe"); if self.subscriptions.remove_signature_subscription(&id) { Ok(true) } else { Err(Error { code: ErrorCode::InvalidParams, message: "Invalid Request: Subscription id does not exist".into(), data: None, }) } } fn slot_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { info!("slot_subscribe"); let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); let sub_id = SubscriptionId::Number(id as u64); info!("slot_subscribe: id={:?}", sub_id); self.subscriptions.add_slot_subscription(sub_id, subscriber); } fn slot_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { info!("slot_unsubscribe"); if self.subscriptions.remove_slot_subscription(&id) { Ok(true) } else { Err(Error { code: ErrorCode::InvalidParams, message: "Invalid Request: Subscription id does not exist".into(), data: None, }) } } } #[cfg(test)] mod tests { use super::*; use crate::rpc_subscriptions::tests::robust_poll_or_panic; use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_pubsub::{PubSubHandler, Session}; use solana_budget_program::{self, budget_instruction}; use solana_ledger::bank_forks::BankForks; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_runtime::bank::Bank; use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signer}, system_program, system_transaction, transaction::{self, Transaction}, }; use std::{sync::RwLock, thread::sleep, time::Duration}; fn process_transaction_and_notify( bank_forks: &Arc>, tx: &Transaction, subscriptions: &RpcSubscriptions, ) -> transaction::Result<()> { bank_forks .write() .unwrap() .get(0) .unwrap() .process_transaction(tx)?; subscriptions.notify_subscribers(0, &bank_forks); Ok(()) } fn create_session() -> Arc { Arc::new(Session::new(mpsc::channel(1).0)) } #[test] fn test_signature_subscribe() { let GenesisConfigInfo { genesis_config, mint_keypair: alice, .. } = create_genesis_config(10_000); let bob = Keypair::new(); let bob_pubkey = bob.pubkey(); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let rpc = RpcSolPubSubImpl::default(); // Test signature subscriptions let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification"); rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); // Test signature confirmation notification let response = robust_poll_or_panic(receiver); let expected_res: Option> = Some(Ok(())); let expected = json!({ "jsonrpc": "2.0", "method": "signatureNotification", "params": { "result": { "context": { "slot": 0 }, "value": expected_res, }, "subscription": 0, } }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); } #[test] fn test_signature_unsubscribe() { let GenesisConfigInfo { genesis_config, mint_keypair: alice, .. } = create_genesis_config(10_000); let bob_pubkey = Pubkey::new_rand(); let bank = Bank::new(&genesis_config); let arc_bank = Arc::new(bank); let blockhash = arc_bank.last_blockhash(); let session = create_session(); let mut io = PubSubHandler::default(); let rpc = RpcSolPubSubImpl::default(); io.extend_with(rpc.to_delegate()); let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, tx.signatures[0].to_string() ); let _res = io.handle_request_sync(&req, session.clone()); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[0]}}"#); let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); let expected: Response = serde_json::from_str(&expected).unwrap(); let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); // Test bad parameter let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[1]}}"#); let res = io.handle_request_sync(&req, session.clone()); let expected = format!( r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"# ); let expected: Response = serde_json::from_str(&expected).unwrap(); let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); } #[test] fn test_account_subscribe() { let GenesisConfigInfo { mut genesis_config, mint_keypair: alice, .. } = create_genesis_config(10_000); // This test depends on the budget program genesis_config .native_instruction_processors .push(solana_budget_program!()); let bob_pubkey = Pubkey::new_rand(); let witness = Keypair::new(); let contract_funds = Keypair::new(); let contract_state = Keypair::new(); let budget_program_id = solana_budget_program::id(); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let rpc = RpcSolPubSubImpl::default(); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe( session, subscriber, contract_state.pubkey().to_string(), None, ); let tx = system_transaction::transfer(&alice, &contract_funds.pubkey(), 51, blockhash); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); let ixs = budget_instruction::when_signed( &contract_funds.pubkey(), &bob_pubkey, &contract_state.pubkey(), &witness.pubkey(), None, 51, ); let tx = Transaction::new_signed_instructions( &[&contract_funds, &contract_state], ixs, blockhash, ); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); // Test signature confirmation notification #1 let expected_data = bank_forks .read() .unwrap() .get(0) .unwrap() .get_account(&contract_state.pubkey()) .unwrap() .data; let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { "context": { "slot": 0 }, "value": { "owner": budget_program_id.to_string(), "lamports": 51, "data": bs58::encode(expected_data).into_string(), "executable": false, "rentEpoch": 1, }, }, "subscription": 0, } }); let response = robust_poll_or_panic(receiver); assert_eq!(serde_json::to_string(&expected).unwrap(), response); let tx = system_transaction::transfer(&alice, &witness.pubkey(), 1, blockhash); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); let ix = budget_instruction::apply_signature( &witness.pubkey(), &contract_state.pubkey(), &bob_pubkey, ); let tx = Transaction::new_signed_instructions(&[&witness], vec![ix], blockhash); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); assert_eq!( bank_forks .read() .unwrap() .get(0) .unwrap() .get_account(&contract_state.pubkey()), None ); } #[test] fn test_account_unsubscribe() { let bob_pubkey = Pubkey::new_rand(); let session = create_session(); let mut io = PubSubHandler::default(); let rpc = RpcSolPubSubImpl::default(); io.extend_with(rpc.to_delegate()); let req = format!( r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#, bob_pubkey.to_string() ); let _res = io.handle_request_sync(&req, session.clone()); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[0]}}"#); let res = io.handle_request_sync(&req, session.clone()); let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); let expected: Response = serde_json::from_str(&expected).unwrap(); let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); // Test bad parameter let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[1]}}"#); let res = io.handle_request_sync(&req, session.clone()); let expected = format!( r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"# ); let expected: Response = serde_json::from_str(&expected).unwrap(); let result: Response = serde_json::from_str(&res.unwrap()).unwrap(); assert_eq!(expected, result); } #[test] #[should_panic] fn test_account_confirmations_not_fulfilled() { let GenesisConfigInfo { genesis_config, mint_keypair: alice, .. } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bob = Keypair::new(); let rpc = RpcSolPubSubImpl::default(); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); bank_forks .write() .unwrap() .get(0) .unwrap() .process_transaction(&tx) .unwrap(); rpc.subscriptions.notify_subscribers(0, &bank_forks); // allow 200ms for notification thread to wake std::thread::sleep(Duration::from_millis(200)); let _panic = robust_poll_or_panic(receiver); } #[test] fn test_account_confirmations() { let GenesisConfigInfo { genesis_config, mint_keypair: alice, .. } = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bob = Keypair::new(); let rpc = RpcSolPubSubImpl::default(); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); bank_forks .write() .unwrap() .get(0) .unwrap() .process_transaction(&tx) .unwrap(); rpc.subscriptions.notify_subscribers(0, &bank_forks); let bank0 = bank_forks.read().unwrap()[0].clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); rpc.subscriptions.notify_subscribers(1, &bank_forks); let bank1 = bank_forks.read().unwrap()[1].clone(); let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); bank_forks.write().unwrap().insert(bank2); rpc.subscriptions.notify_subscribers(2, &bank_forks); let expected = json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { "context": { "slot": 0 }, "value": { "owner": system_program::id().to_string(), "lamports": 100, "data": "", "executable": false, "rentEpoch": 1, }, }, "subscription": 0, } }); let response = robust_poll_or_panic(receiver); assert_eq!(serde_json::to_string(&expected).unwrap(), response); } #[test] fn test_slot_subscribe() { let rpc = RpcSolPubSubImpl::default(); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); rpc.subscriptions.notify_slot(0, 0, 0); // Test slot confirmation notification let response = robust_poll_or_panic(receiver); let expected_res = SlotInfo { parent: 0, slot: 0, root: 0, }; let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); let expected = format!( r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str ); assert_eq!(expected, response); } #[test] fn test_slot_unsubscribe() { let rpc = RpcSolPubSubImpl::default(); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("slotNotification"); rpc.slot_subscribe(session, subscriber); rpc.subscriptions.notify_slot(0, 0, 0); let response = robust_poll_or_panic(receiver); let expected_res = SlotInfo { parent: 0, slot: 0, root: 0, }; let expected_res_str = serde_json::to_string(&serde_json::to_value(expected_res).unwrap()).unwrap(); let expected = format!( r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{},"subscription":0}}}}"#, expected_res_str ); assert_eq!(expected, response); let session = create_session(); assert!(rpc .slot_unsubscribe(Some(session), SubscriptionId::Number(42)) .is_err()); let session = create_session(); assert!(rpc .slot_unsubscribe(Some(session), SubscriptionId::Number(0)) .is_ok()); } }