From 50d3fa7437d19cc19321d34caf9f572de9703feb Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sun, 17 Feb 2019 09:38:36 -0700 Subject: [PATCH] Move RpcSubscriptions into its own module --- src/bank.rs | 2 +- src/lib.rs | 1 + src/rpc_pubsub.rs | 197 +----------------------------------- src/rpc_subscriptions.rs | 212 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 217 insertions(+), 195 deletions(-) create mode 100644 src/rpc_subscriptions.rs diff --git a/src/bank.rs b/src/bank.rs index 670f380152..af41abac5d 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -8,7 +8,7 @@ use crate::counter::Counter; use crate::genesis_block::GenesisBlock; use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS}; use crate::poh_service::NUM_TICKS_PER_SECOND; -use crate::rpc_pubsub::RpcSubscriptions; +use crate::rpc_subscriptions::RpcSubscriptions; use crate::status_cache::StatusCache; use bincode::deserialize; use log::Level; diff --git a/src/lib.rs b/src/lib.rs index 4c89a480df..71c08aec0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,7 @@ pub mod rpc; pub mod rpc_mock; pub mod rpc_pubsub; pub mod rpc_request; +pub mod rpc_subscriptions; pub mod service; pub mod sigverify; pub mod sigverify_stage; diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index dfdf1090dc..be90257171 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -1,20 +1,19 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::bank; -use crate::bank::{Bank, BankError}; +use crate::bank::Bank; use crate::rpc::RpcSignatureStatus; +use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use bs58; use jsonrpc_core::futures::Future; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; -use jsonrpc_pubsub::typed::{Sink, Subscriber}; +use jsonrpc_pubsub::typed::Subscriber; use jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId}; use jsonrpc_ws_server::{RequestContext, ServerBuilder}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; -use std::collections::HashMap; use std::mem; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; @@ -144,113 +143,6 @@ impl RpcPubSubBank { } } -type RpcAccountSubscriptions = RwLock>>>; -type RpcSignatureSubscriptions = - RwLock>>>; -pub struct RpcSubscriptions { - account_subscriptions: RpcAccountSubscriptions, - signature_subscriptions: RpcSignatureSubscriptions, -} - -impl Default for RpcSubscriptions { - fn default() -> Self { - RpcSubscriptions { - account_subscriptions: RpcAccountSubscriptions::default(), - signature_subscriptions: RpcSignatureSubscriptions::default(), - } - } -} - -impl RpcSubscriptions { - pub fn check_account(&self, pubkey: &Pubkey, account: &Account) { - let subscriptions = self.account_subscriptions.read().unwrap(); - if let Some(hashmap) = subscriptions.get(pubkey) { - for (_bank_sub_id, sink) in hashmap.iter() { - sink.notify(Ok(account.clone())).wait().unwrap(); - } - } - } - - pub fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) { - let status = match bank_error { - Ok(_) => RpcSignatureStatus::Confirmed, - Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, - Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, - Err(_) => RpcSignatureStatus::GenericFailure, - }; - - let mut subscriptions = self.signature_subscriptions.write().unwrap(); - if let Some(hashmap) = subscriptions.get(signature) { - for (_bank_sub_id, sink) in hashmap.iter() { - sink.notify(Ok(status)).wait().unwrap(); - } - } - subscriptions.remove(&signature); - } - - pub fn add_account_subscription( - &self, - pubkey: &Pubkey, - sub_id: &SubscriptionId, - sink: &Sink, - ) { - let mut subscriptions = self.account_subscriptions.write().unwrap(); - if let Some(current_hashmap) = subscriptions.get_mut(pubkey) { - current_hashmap.insert(sub_id.clone(), sink.clone()); - return; - } - let mut hashmap = HashMap::new(); - hashmap.insert(sub_id.clone(), sink.clone()); - subscriptions.insert(*pubkey, hashmap); - } - - pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { - let mut subscriptions = self.account_subscriptions.write().unwrap(); - let mut found = false; - subscriptions.retain(|_, v| { - v.retain(|k, _| { - if *k == *id { - found = true; - } - !found - }); - !v.is_empty() - }); - found - } - - pub fn add_signature_subscription( - &self, - signature: &Signature, - sub_id: &SubscriptionId, - sink: &Sink, - ) { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); - if let Some(current_hashmap) = subscriptions.get_mut(signature) { - current_hashmap.insert(sub_id.clone(), sink.clone()); - return; - } - let mut hashmap = HashMap::new(); - hashmap.insert(sub_id.clone(), sink.clone()); - subscriptions.insert(*signature, hashmap); - } - - pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); - let mut found = false; - subscriptions.retain(|_, v| { - v.retain(|k, _| { - if *k == *id { - found = true; - } - !found - }); - !v.is_empty() - }); - found - } -} - struct RpcSolPubSubImpl { uid: Arc, bank: Arc>, @@ -695,87 +587,4 @@ mod tests { .expect("actual response deserialization"); assert_eq!(expected, result); } - - #[test] - fn test_check_account_subscribe() { - let (genesis_block, mint_keypair) = GenesisBlock::new(100); - let bank = Bank::new(&genesis_block); - let alice = Keypair::new(); - let last_id = bank.last_id(); - let tx = SystemTransaction::new_program_account( - &mint_keypair, - alice.pubkey(), - last_id, - 1, - 16, - budget_program::id(), - 0, - ); - bank.process_transaction(&tx).unwrap(); - - let (subscriber, _id_receiver, mut transport_receiver) = - Subscriber::new_test("accountNotification"); - let sub_id = SubscriptionId::Number(0 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let subscriptions = RpcSubscriptions::default(); - subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink); - - assert!(subscriptions - .account_subscriptions - .write() - .unwrap() - .contains_key(&alice.pubkey())); - - let account = bank.get_account(&alice.pubkey()).unwrap(); - subscriptions.check_account(&alice.pubkey(), &account); - let string = transport_receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#); - assert_eq!(expected, response); - } - - subscriptions.remove_account_subscription(&sub_id); - assert!(!subscriptions - .account_subscriptions - .write() - .unwrap() - .contains_key(&alice.pubkey())); - } - #[test] - fn test_check_signature_subscribe() { - let (genesis_block, mint_keypair) = GenesisBlock::new(100); - let bank = Bank::new(&genesis_block); - let alice = Keypair::new(); - let last_id = bank.last_id(); - let tx = SystemTransaction::new_move(&mint_keypair, alice.pubkey(), 20, last_id, 0); - let signature = tx.signatures[0]; - bank.process_transaction(&tx).unwrap(); - - let (subscriber, _id_receiver, mut transport_receiver) = - Subscriber::new_test("signatureNotification"); - let sub_id = SubscriptionId::Number(0 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let subscriptions = RpcSubscriptions::default(); - subscriptions.add_signature_subscription(&signature, &sub_id, &sink); - - assert!(subscriptions - .signature_subscriptions - .write() - .unwrap() - .contains_key(&signature)); - - subscriptions.check_signature(&signature, &Ok(())); - let string = transport_receiver.poll(); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#); - assert_eq!(expected, response); - } - - subscriptions.remove_signature_subscription(&sub_id); - assert!(!subscriptions - .signature_subscriptions - .write() - .unwrap() - .contains_key(&signature)); - } } diff --git a/src/rpc_subscriptions.rs b/src/rpc_subscriptions.rs new file mode 100644 index 0000000000..4383d7a3a5 --- /dev/null +++ b/src/rpc_subscriptions.rs @@ -0,0 +1,212 @@ +//! The `pubsub` module implements a threaded subscription service on client RPC request + +use crate::bank::{self, Bank, BankError}; +use crate::rpc::RpcSignatureStatus; +use jsonrpc_core::futures::Future; +use jsonrpc_pubsub::typed::{Sink, Subscriber}; +use jsonrpc_pubsub::SubscriptionId; +use solana_sdk::account::Account; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use std::collections::HashMap; +use std::sync::RwLock; + +type RpcAccountSubscriptions = RwLock>>>; +type RpcSignatureSubscriptions = + RwLock>>>; +pub struct RpcSubscriptions { + account_subscriptions: RpcAccountSubscriptions, + signature_subscriptions: RpcSignatureSubscriptions, +} + +impl Default for RpcSubscriptions { + fn default() -> Self { + RpcSubscriptions { + account_subscriptions: RpcAccountSubscriptions::default(), + signature_subscriptions: RpcSignatureSubscriptions::default(), + } + } +} + +impl RpcSubscriptions { + pub fn check_account(&self, pubkey: &Pubkey, account: &Account) { + let subscriptions = self.account_subscriptions.read().unwrap(); + if let Some(hashmap) = subscriptions.get(pubkey) { + for (_bank_sub_id, sink) in hashmap.iter() { + sink.notify(Ok(account.clone())).wait().unwrap(); + } + } + } + + pub fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) { + let status = match bank_error { + Ok(_) => RpcSignatureStatus::Confirmed, + Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, + Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, + Err(_) => RpcSignatureStatus::GenericFailure, + }; + + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + if let Some(hashmap) = subscriptions.get(signature) { + for (_bank_sub_id, sink) in hashmap.iter() { + sink.notify(Ok(status)).wait().unwrap(); + } + } + subscriptions.remove(&signature); + } + + pub fn add_account_subscription( + &self, + pubkey: &Pubkey, + sub_id: &SubscriptionId, + sink: &Sink, + ) { + let mut subscriptions = self.account_subscriptions.write().unwrap(); + if let Some(current_hashmap) = subscriptions.get_mut(pubkey) { + current_hashmap.insert(sub_id.clone(), sink.clone()); + return; + } + let mut hashmap = HashMap::new(); + hashmap.insert(sub_id.clone(), sink.clone()); + subscriptions.insert(*pubkey, hashmap); + } + + pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.account_subscriptions.write().unwrap(); + let mut found = false; + subscriptions.retain(|_, v| { + v.retain(|k, _| { + if *k == *id { + found = true; + } + !found + }); + !v.is_empty() + }); + found + } + + pub fn add_signature_subscription( + &self, + signature: &Signature, + sub_id: &SubscriptionId, + sink: &Sink, + ) { + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + if let Some(current_hashmap) = subscriptions.get_mut(signature) { + current_hashmap.insert(sub_id.clone(), sink.clone()); + return; + } + let mut hashmap = HashMap::new(); + hashmap.insert(sub_id.clone(), sink.clone()); + subscriptions.insert(*signature, hashmap); + } + + pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + let mut found = false; + subscriptions.retain(|_, v| { + v.retain(|k, _| { + if *k == *id { + found = true; + } + !found + }); + !v.is_empty() + }); + found + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::genesis_block::GenesisBlock; + use solana_sdk::budget_program; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction::SystemTransaction; + use tokio::prelude::{Async, Stream}; + + #[test] + fn test_check_account_subscribe() { + let (genesis_block, mint_keypair) = GenesisBlock::new(100); + let bank = Bank::new(&genesis_block); + let alice = Keypair::new(); + let last_id = bank.last_id(); + let tx = SystemTransaction::new_program_account( + &mint_keypair, + alice.pubkey(), + last_id, + 1, + 16, + budget_program::id(), + 0, + ); + bank.process_transaction(&tx).unwrap(); + + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("accountNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let subscriptions = RpcSubscriptions::default(); + subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink); + + assert!(subscriptions + .account_subscriptions + .write() + .unwrap() + .contains_key(&alice.pubkey())); + + let account = bank.get_account(&alice.pubkey()).unwrap(); + subscriptions.check_account(&alice.pubkey(), &account); + let string = transport_receiver.poll(); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#); + assert_eq!(expected, response); + } + + subscriptions.remove_account_subscription(&sub_id); + assert!(!subscriptions + .account_subscriptions + .write() + .unwrap() + .contains_key(&alice.pubkey())); + } + #[test] + fn test_check_signature_subscribe() { + let (genesis_block, mint_keypair) = GenesisBlock::new(100); + let bank = Bank::new(&genesis_block); + let alice = Keypair::new(); + let last_id = bank.last_id(); + let tx = SystemTransaction::new_move(&mint_keypair, alice.pubkey(), 20, last_id, 0); + let signature = tx.signatures[0]; + bank.process_transaction(&tx).unwrap(); + + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("signatureNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let subscriptions = RpcSubscriptions::default(); + subscriptions.add_signature_subscription(&signature, &sub_id, &sink); + + assert!(subscriptions + .signature_subscriptions + .write() + .unwrap() + .contains_key(&signature)); + + subscriptions.check_signature(&signature, &Ok(())); + let string = transport_receiver.poll(); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#); + assert_eq!(expected, response); + } + + subscriptions.remove_signature_subscription(&sub_id); + assert!(!subscriptions + .signature_subscriptions + .write() + .unwrap() + .contains_key(&signature)); + } +}