From 6611188edf558757dcb0244b75986522ae9eecd8 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 21 Jan 2019 09:59:09 -0800 Subject: [PATCH] Move subscriptions to rpc_pubsub (#2490) * Move subscriptions to rpc_pubsub - this helps avoid recreating pubsub_service on node's role change * fixed tests and addressed review comments * fix clippy errors * address review comments --- src/bank.rs | 228 ++++++----------------------------- src/fullnode.rs | 20 +--- src/rpc_pubsub.rs | 295 +++++++++++++++++++++++++++++++++++++++------- 3 files changed, 291 insertions(+), 252 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 359ce8bbfa..b7f2cd95f5 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -8,16 +8,13 @@ use crate::checkpoint::Checkpoint; use crate::counter::Counter; use crate::entry::Entry; use crate::entry::EntrySlice; -use crate::jsonrpc_macros::pubsub::Sink; use crate::leader_scheduler::LeaderScheduler; use crate::mint::Mint; use crate::poh_recorder::PohRecorder; -use crate::rpc::RpcSignatureStatus; use crate::runtime::{self, RuntimeError}; use crate::status_deque::{Status, StatusDeque, MAX_ENTRY_IDS}; use crate::storage_stage::StorageState; use bincode::deserialize; -use hashbrown::HashMap; use itertools::Itertools; use log::Level; use rayon::prelude::*; @@ -44,7 +41,6 @@ use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Instant; -use tokio::prelude::Future; /// Reasons a transaction might be rejected. #[derive(Debug, PartialEq, Eq, Clone)] @@ -87,6 +83,23 @@ pub type Result = result::Result; pub const VERIFY_BLOCK_SIZE: usize = 16; +pub trait BankSubscriptions { + fn check_account(&self, pubkey: &Pubkey, account: &Account); + fn check_signature(&self, signature: &Signature, status: &Result<()>); +} + +struct LocalSubscriptions {} +impl Default for LocalSubscriptions { + fn default() -> Self { + LocalSubscriptions {} + } +} + +impl BankSubscriptions for LocalSubscriptions { + fn check_account(&self, _pubkey: &Pubkey, _account: &Account) {} + fn check_signature(&self, _signature: &Signature, _status: &Result<()>) {} +} + /// Manager for the state of all accounts and programs after processing its entries. pub struct Bank { pub accounts: Accounts, @@ -97,17 +110,13 @@ pub struct Bank { // The latest confirmation time for the network confirmation_time: AtomicUsize, - // Mapping of account ids to Subscriber ids and sinks to notify on userdata update - account_subscriptions: RwLock>>>, - - // Mapping of signatures to Subscriber ids and sinks to notify on confirmation - signature_subscriptions: RwLock>>>, - /// Tracks and updates the leader schedule based on the votes and account stakes /// processed by the bank pub leader_scheduler: Arc>, pub storage_state: StorageState, + + subscriptions: RwLock>>, } impl Default for Bank { @@ -116,10 +125,9 @@ impl Default for Bank { accounts: Accounts::default(), last_ids: RwLock::new(StatusDeque::default()), confirmation_time: AtomicUsize::new(std::usize::MAX), - account_subscriptions: RwLock::new(HashMap::new()), - signature_subscriptions: RwLock::new(HashMap::new()), leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), storage_state: StorageState::new(), + subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))), } } } @@ -145,6 +153,11 @@ impl Bank { bank } + pub fn set_subscriptions(&self, subscriptions: Box>) { + let mut sub = self.subscriptions.write().unwrap(); + *sub = subscriptions + } + pub fn checkpoint(&self) { self.accounts.checkpoint(); self.last_ids.write().unwrap().checkpoint(); @@ -160,7 +173,10 @@ impl Bank { rolled_back_pubkeys.iter().for_each(|pubkey| { if let Some(account) = self.accounts.load_slow(&pubkey) { - self.check_account_subscriptions(&pubkey, &account) + self.subscriptions + .read() + .unwrap() + .check_account(&pubkey, &account) } }); @@ -330,15 +346,10 @@ impl Bank { let mut last_ids = self.last_ids.write().unwrap(); for (i, tx) in txs.iter().enumerate() { last_ids.update_signature_status_with_last_id(&tx.signatures[0], &res[i], &tx.last_id); - let status = match res[i] { - Ok(_) => RpcSignatureStatus::Confirmed, - Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, - Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, - Err(_) => RpcSignatureStatus::GenericFailure, - }; - if status != RpcSignatureStatus::SignatureNotFound { - self.check_signature_subscriptions(&tx.signatures[0], status); - } + self.subscriptions + .read() + .unwrap() + .check_signature(&tx.signatures[0], &res[i]); } } @@ -887,41 +898,14 @@ impl Bank { let tx = &txs[i]; let accs = raccs.as_ref().unwrap(); for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { - self.check_account_subscriptions(&key, account); + self.subscriptions + .read() + .unwrap() + .check_account(&key, account); } } } - pub fn add_account_subscription( - &self, - bank_sub_id: Pubkey, - pubkey: Pubkey, - sink: Sink, - ) { - let mut subscriptions = self.account_subscriptions.write().unwrap(); - if let Some(current_hashmap) = subscriptions.get_mut(&pubkey) { - current_hashmap.insert(bank_sub_id, sink); - return; - } - let mut hashmap = HashMap::new(); - hashmap.insert(bank_sub_id, sink); - subscriptions.insert(pubkey, hashmap); - } - - pub fn remove_account_subscription(&self, bank_sub_id: &Pubkey, pubkey: &Pubkey) -> bool { - let mut subscriptions = self.account_subscriptions.write().unwrap(); - match subscriptions.get_mut(pubkey) { - Some(ref current_hashmap) if current_hashmap.len() == 1 => {} - Some(current_hashmap) => { - return current_hashmap.remove(bank_sub_id).is_some(); - } - None => { - return false; - } - } - subscriptions.remove(pubkey).is_some() - } - pub fn get_current_leader(&self) -> Option<(Pubkey, u64)> { self.leader_scheduler .read() @@ -932,66 +916,12 @@ impl Bank { pub fn tick_height(&self) -> u64 { self.last_ids.read().unwrap().tick_height } - - fn check_account_subscriptions(&self, pubkey: &Pubkey, account: &Account) { - let subscriptions = self.account_subscriptions.read().unwrap(); - if let Some(hashmap) = subscriptions.get(pubkey) { - for (_bank_sub_id, sink) in hashmap.iter() { - sink.notify(Ok(account.clone())).wait().unwrap(); - } - } - } - - pub fn add_signature_subscription( - &self, - bank_sub_id: Pubkey, - signature: Signature, - sink: Sink, - ) { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); - if let Some(current_hashmap) = subscriptions.get_mut(&signature) { - current_hashmap.insert(bank_sub_id, sink); - return; - } - let mut hashmap = HashMap::new(); - hashmap.insert(bank_sub_id, sink); - subscriptions.insert(signature, hashmap); - } - - pub fn remove_signature_subscription( - &self, - bank_sub_id: &Pubkey, - signature: &Signature, - ) -> bool { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); - match subscriptions.get_mut(signature) { - Some(ref current_hashmap) if current_hashmap.len() == 1 => {} - Some(current_hashmap) => { - return current_hashmap.remove(bank_sub_id).is_some(); - } - None => { - return false; - } - } - subscriptions.remove(signature).is_some() - } - - fn check_signature_subscriptions(&self, signature: &Signature, status: RpcSignatureStatus) { - let mut subscriptions = self.signature_subscriptions.write().unwrap(); - if let Some(hashmap) = subscriptions.get(signature) { - for (_bank_sub_id, sink) in hashmap.iter() { - sink.notify(Ok(status)).wait().unwrap(); - } - } - subscriptions.remove(&signature); - } } #[cfg(test)] mod tests { use super::*; use crate::entry::{next_entries, next_entry, Entry}; - use crate::jsonrpc_macros::pubsub::{Subscriber, SubscriptionId}; use crate::signature::GenKeys; use crate::status_deque; use crate::status_deque::StatusDequeError; @@ -1006,7 +936,6 @@ mod tests { use solana_sdk::transaction::Instruction; use std; use std::sync::mpsc::channel; - use tokio::prelude::{Async, Stream}; #[test] fn test_bank_new() { @@ -1447,90 +1376,7 @@ mod tests { Ok(_) ); } - #[test] - fn test_bank_account_subscribe() { - let mint = Mint::new(100); - let bank = Bank::new(&mint); - let alice = Keypair::new(); - let bank_sub_id = Keypair::new().pubkey(); - let last_id = bank.last_id(); - let tx = Transaction::system_create( - &mint.keypair(), - alice.pubkey(), - last_id, - 1, - 16, - budget_program::id(), - 0, - ); - bank.process_transaction(&tx).unwrap(); - let (subscriber, _id_receiver, mut transport_receiver) = - Subscriber::new_test("accountNotification"); - let sub_id = SubscriptionId::Number(0 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - bank.add_account_subscription(bank_sub_id, alice.pubkey(), sink); - - assert!(bank - .account_subscriptions - .write() - .unwrap() - .contains_key(&alice.pubkey())); - - let account = bank.get_account(&alice.pubkey()).unwrap(); - bank.check_account_subscriptions(&alice.pubkey(), &account); - let string = transport_receiver.poll(); - assert!(string.is_ok()); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"loader":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#); - assert_eq!(expected, response); - } - - bank.remove_account_subscription(&bank_sub_id, &alice.pubkey()); - assert!(!bank - .account_subscriptions - .write() - .unwrap() - .contains_key(&alice.pubkey())); - } - #[test] - fn test_bank_signature_subscribe() { - let mint = Mint::new(100); - let bank = Bank::new(&mint); - let alice = Keypair::new(); - let bank_sub_id = Keypair::new().pubkey(); - let last_id = bank.last_id(); - let tx = Transaction::system_move(&mint.keypair(), alice.pubkey(), 20, last_id, 0); - let signature = tx.signatures[0]; - bank.process_transaction(&tx).unwrap(); - - let (subscriber, _id_receiver, mut transport_receiver) = - Subscriber::new_test("signatureNotification"); - let sub_id = SubscriptionId::Number(0 as u64); - let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - bank.add_signature_subscription(bank_sub_id, signature, sink); - - assert!(bank - .signature_subscriptions - .write() - .unwrap() - .contains_key(&signature)); - - bank.check_signature_subscriptions(&signature, RpcSignatureStatus::Confirmed); - let string = transport_receiver.poll(); - assert!(string.is_ok()); - if let Async::Ready(Some(response)) = string.unwrap() { - let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#); - assert_eq!(expected, response); - } - - bank.remove_signature_subscription(&bank_sub_id, &signature); - assert!(!bank - .signature_subscriptions - .write() - .unwrap() - .contains_key(&signature)); - } #[test] fn test_first_err() { assert_eq!(Bank::first_err(&[Ok(())]), Ok(())); diff --git a/src/fullnode.rs b/src/fullnode.rs index d5f89f915e..edb19b96d4 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -104,7 +104,6 @@ pub struct Fullnode { retransmit_socket: UdpSocket, tpu_sockets: Vec, broadcast_socket: UdpSocket, - rpc_pubsub_addr: SocketAddr, db_ledger: Arc, vote_signer: Arc, } @@ -370,7 +369,6 @@ impl Fullnode { retransmit_socket: node.sockets.retransmit, tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, - rpc_pubsub_addr, db_ledger, vote_signer, } @@ -379,12 +377,6 @@ impl Fullnode { fn leader_to_validator(&mut self) -> Result<()> { trace!("leader_to_validator"); - // Close down any services that could have a reference to the bank - if self.rpc_pubsub_service.is_some() { - let old_rpc_pubsub_service = self.rpc_pubsub_service.take().unwrap(); - old_rpc_pubsub_service.close()?; - } - // Correctness check: Ensure that references to the bank and leader scheduler are no // longer held by any running thread let mut new_leader_scheduler = self.bank.leader_scheduler.read().unwrap().clone(); @@ -418,15 +410,9 @@ impl Fullnode { rpc_service.set_bank(&new_bank); } - // TODO: Don't restart PubSubService on leader rotation - // See https://github.com/solana-labs/solana/issues/2419 - self.rpc_pubsub_service = Some(PubSubService::new( - &new_bank, - SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - self.rpc_pubsub_addr.port(), - ), - )); + if let Some(ref mut rpc_pubsub_service) = self.rpc_pubsub_service { + rpc_pubsub_service.set_bank(&new_bank); + } self.bank = new_bank; diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 43aea765a6..e32a7c8b98 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -1,9 +1,11 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::bank::Bank; +use crate::bank; +use crate::bank::{Bank, BankError, BankSubscriptions}; use crate::jsonrpc_core::futures::Future; use crate::jsonrpc_core::*; use crate::jsonrpc_macros::pubsub; +use crate::jsonrpc_macros::pubsub::Sink; use crate::jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId}; use crate::jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder}; use crate::rpc::RpcSignatureStatus; @@ -12,7 +14,7 @@ use crate::status_deque::Status; use bs58; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana_sdk::signature::Signature; use std::collections::HashMap; use std::mem; use std::net::SocketAddr; @@ -29,6 +31,8 @@ pub enum ClientState { pub struct PubSubService { thread_hdl: JoinHandle<()>, exit: Arc, + rpc_bank: Arc>, + subscription: Arc, } impl Service for PubSubService { @@ -42,7 +46,10 @@ impl Service for PubSubService { impl PubSubService { pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { info!("rpc_pubsub bound to {:?}", pubsub_addr); - let rpc = RpcSolPubSubImpl::new(bank.clone()); + let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone()))); + let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); + let subscription = rpc.subscription.clone(); + bank.set_subscriptions(Box::new(subscription.clone())); let exit = Arc::new(AtomicBool::new(false)); let exit_ = exit.clone(); let thread_hdl = Builder::new() @@ -71,7 +78,17 @@ impl PubSubService { server.unwrap().close(); }) .unwrap(); - PubSubService { thread_hdl, exit } + PubSubService { + thread_hdl, + exit, + rpc_bank, + subscription, + } + } + + pub fn set_bank(&self, bank: &Arc) { + self.rpc_bank.write().unwrap().bank = bank.clone(); + bank.set_subscriptions(Box::new(self.subscription.clone())); } pub fn exit(&self) { @@ -111,20 +128,135 @@ build_rpc_trait! { } } +struct RpcPubSubBank { + bank: Arc, +} + +impl RpcPubSubBank { + pub fn new(bank: Arc) -> Self { + RpcPubSubBank { bank } + } +} + +pub struct RpcSubscriptions { + account_subscriptions: RwLock>>>, + signature_subscriptions: + RwLock>>>, +} + +impl Default for RpcSubscriptions { + fn default() -> Self { + RpcSubscriptions { + account_subscriptions: Default::default(), + signature_subscriptions: Default::default(), + } + } +} + +impl BankSubscriptions for RpcSubscriptions { + fn check_account(&self, pubkey: &Pubkey, account: &Account) { + let subscriptions = self.account_subscriptions.read().unwrap(); + if let Some(hashmap) = subscriptions.get(pubkey) { + for (_bank_sub_id, sink) in hashmap.iter() { + sink.notify(Ok(account.clone())).wait().unwrap(); + } + } + } + + fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) { + let status = match bank_error { + Ok(_) => RpcSignatureStatus::Confirmed, + Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, + Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, + Err(_) => RpcSignatureStatus::GenericFailure, + }; + + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + if let Some(hashmap) = subscriptions.get(signature) { + for (_bank_sub_id, sink) in hashmap.iter() { + sink.notify(Ok(status)).wait().unwrap(); + } + } + subscriptions.remove(&signature); + } +} + +impl RpcSubscriptions { + pub fn add_account_subscription( + &self, + pubkey: &Pubkey, + sub_id: &SubscriptionId, + sink: &Sink, + ) { + let mut subscriptions = self.account_subscriptions.write().unwrap(); + if let Some(current_hashmap) = subscriptions.get_mut(pubkey) { + current_hashmap.insert(sub_id.clone(), sink.clone()); + return; + } + let mut hashmap = HashMap::new(); + hashmap.insert(sub_id.clone(), sink.clone()); + subscriptions.insert(*pubkey, hashmap); + } + + pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.account_subscriptions.write().unwrap(); + let mut found = false; + subscriptions.retain(|_, v| { + v.retain(|k, _| { + if *k == *id { + found = true; + } + !found + }); + !v.is_empty() + }); + found + } + + pub fn add_signature_subscription( + &self, + signature: &Signature, + sub_id: &SubscriptionId, + sink: &Sink, + ) { + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + if let Some(current_hashmap) = subscriptions.get_mut(signature) { + current_hashmap.insert(sub_id.clone(), sink.clone()); + return; + } + let mut hashmap = HashMap::new(); + hashmap.insert(sub_id.clone(), sink.clone()); + subscriptions.insert(*signature, hashmap); + } + + pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + let mut found = false; + subscriptions.retain(|_, v| { + v.retain(|k, _| { + if *k == *id { + found = true; + } + !found + }); + !v.is_empty() + }); + found + } +} + struct RpcSolPubSubImpl { uid: Arc, - bank: Arc, - account_subscriptions: Arc>>, - signature_subscriptions: Arc>>, + bank: Arc>, + subscription: Arc, } impl RpcSolPubSubImpl { - fn new(bank: Arc) -> Self { + fn new(bank: Arc>) -> Self { RpcSolPubSubImpl { uid: Default::default(), bank, - account_subscriptions: Default::default(), - signature_subscriptions: Default::default(), + subscription: Arc::new(Default::default()), } } } @@ -155,21 +287,14 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let sub_id = SubscriptionId::Number(id as u64); info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let bank_sub_id = Keypair::new().pubkey(); - self.account_subscriptions - .write() - .unwrap() - .insert(sub_id.clone(), (bank_sub_id, pubkey)); - self.bank - .add_account_subscription(bank_sub_id, pubkey, sink); + self.subscription + .add_account_subscription(&pubkey, &sub_id, &sink) } fn account_unsubscribe(&self, id: SubscriptionId) -> Result { info!("account_unsubscribe: id={:?}", id); - if let Some((bank_sub_id, pubkey)) = self.account_subscriptions.write().unwrap().remove(&id) - { - self.bank.remove_account_subscription(&bank_sub_id, &pubkey); + if self.subscription.remove_account_subscription(&id) { Ok(true) } else { Err(Error { @@ -199,18 +324,19 @@ impl RpcSolPubSub for RpcSolPubSubImpl { return; } let signature = Signature::new(&signature_vec); - let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); let sub_id = SubscriptionId::Number(id as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let bank_sub_id = Keypair::new().pubkey(); - self.signature_subscriptions - .write() - .unwrap() - .insert(sub_id.clone(), (bank_sub_id, signature)); - let status = self.bank.get_signature_status(&signature); + let status = self + .bank + .read() + .unwrap() + .bank + .get_signature_status(&signature); if status.is_none() { + self.subscription + .add_signature_subscription(&signature, &sub_id, &sink); return; } @@ -219,25 +345,16 @@ impl RpcSolPubSub for RpcSolPubSubImpl { sink.notify(Ok(RpcSignatureStatus::Confirmed)) .wait() .unwrap(); - self.signature_subscriptions - .write() - .unwrap() - .remove(&sub_id); - } - _ => { - self.bank - .add_signature_subscription(bank_sub_id, signature, sink); } + _ => self + .subscription + .add_signature_subscription(&signature, &sub_id, &sink), } } fn signature_unsubscribe(&self, id: SubscriptionId) -> Result { info!("signature_unsubscribe"); - if let Some((bank_sub_id, signature)) = - self.signature_subscriptions.write().unwrap().remove(&id) - { - self.bank - .remove_signature_subscription(&bank_sub_id, &signature); + if self.subscription.remove_signature_subscription(&id) { Ok(true) } else { Err(Error { @@ -253,6 +370,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { mod tests { use super::*; use crate::jsonrpc_core::futures::sync::mpsc; + use crate::jsonrpc_macros::pubsub::{Subscriber, SubscriptionId}; use crate::mint::Mint; use solana_sdk::budget_program; use solana_sdk::budget_transaction::BudgetTransaction; @@ -286,7 +404,8 @@ mod tests { let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); + let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); io.extend_with(rpc.to_delegate()); // Test signature subscription @@ -359,7 +478,8 @@ mod tests { let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); + let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); io.extend_with(rpc.to_delegate()); let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); @@ -413,7 +533,8 @@ mod tests { let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); + let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); io.extend_with(rpc.to_delegate()); let req = format!( @@ -591,7 +712,8 @@ mod tests { let session = Arc::new(Session::new(sender)); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); + let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); io.extend_with(rpc.to_delegate()); @@ -625,4 +747,89 @@ mod tests { .expect("actual response deserialization"); assert_eq!(expected, result); } + + #[test] + fn test_check_account_subscribe() { + let mint = Mint::new(100); + let bank = Bank::new(&mint); + let alice = Keypair::new(); + let last_id = bank.last_id(); + let tx = Transaction::system_create( + &mint.keypair(), + alice.pubkey(), + last_id, + 1, + 16, + budget_program::id(), + 0, + ); + bank.process_transaction(&tx).unwrap(); + + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("accountNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let subscriptions = RpcSubscriptions::default(); + subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink); + + assert!(subscriptions + .account_subscriptions + .write() + .unwrap() + .contains_key(&alice.pubkey())); + + let account = bank.get_account(&alice.pubkey()).unwrap(); + subscriptions.check_account(&alice.pubkey(), &account); + let string = transport_receiver.poll(); + assert!(string.is_ok()); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"loader":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#); + assert_eq!(expected, response); + } + + subscriptions.remove_account_subscription(&sub_id); + assert!(!subscriptions + .account_subscriptions + .write() + .unwrap() + .contains_key(&alice.pubkey())); + } + #[test] + fn test_check_signature_subscribe() { + let mint = Mint::new(100); + let bank = Bank::new(&mint); + let alice = Keypair::new(); + let last_id = bank.last_id(); + let tx = Transaction::system_move(&mint.keypair(), alice.pubkey(), 20, last_id, 0); + let signature = tx.signatures[0]; + bank.process_transaction(&tx).unwrap(); + + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("signatureNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let subscriptions = RpcSubscriptions::default(); + subscriptions.add_signature_subscription(&signature, &sub_id, &sink); + + assert!(subscriptions + .signature_subscriptions + .write() + .unwrap() + .contains_key(&signature)); + + subscriptions.check_signature(&signature, &Ok(())); + let string = transport_receiver.poll(); + assert!(string.is_ok()); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#); + assert_eq!(expected, response); + } + + subscriptions.remove_signature_subscription(&sub_id); + assert!(!subscriptions + .signature_subscriptions + .write() + .unwrap() + .contains_key(&signature)); + } }