diff --git a/src/bank.rs b/src/bank.rs index 0d4c1178fb..2942c59247 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -557,6 +557,10 @@ impl Bank { Accounts::load_slow(&accounts, pubkey) } + pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option { + Accounts::load_slow(&[&self.accounts], pubkey) + } + pub fn transaction_count(&self) -> u64 { self.accounts.transaction_count() } diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 27387a8315..00a6306741 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -200,24 +200,16 @@ mod tests { use std::time::Duration; use tokio::prelude::{Async, Stream}; - pub fn process_transaction_and_notify( - bank: &Bank, + fn process_transaction_and_notify( + bank: &Arc, tx: &Transaction, subscriptions: &RpcSubscriptions, - ) -> bank::Result<()> { + ) -> bank::Result> { bank.process_transaction(tx)?; + subscriptions.notify_subscribers(&bank); - for pubkey in &tx.account_keys { - if let Some(account) = &bank.get_account(pubkey) { - subscriptions.check_account(pubkey, account); - } - } - - let signature = &tx.signatures[0]; - let status = bank.get_signature_status(signature).unwrap(); - subscriptions.check_signature(signature, &status); - - Ok(()) + // Simulate a block boundary + Ok(Arc::new(Bank::new_from_parent(&bank))) } fn create_session() -> Arc { @@ -323,7 +315,7 @@ mod tests { budget_program_id, 0, ); - process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); let tx = SystemTransaction::new_program_account( &alice, @@ -335,7 +327,7 @@ mod tests { 0, ); - process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); // Test signature confirmation notification #1 let string = receiver.poll(); @@ -363,7 +355,6 @@ mod tests { if let Async::Ready(Some(response)) = string.unwrap() { assert_eq!(serde_json::to_string(&expected).unwrap(), response); } - let tx = BudgetTransaction::new_when_signed( &contract_funds, bob_pubkey, @@ -373,7 +364,7 @@ mod tests { 50, last_id, ); - process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); // Test signature confirmation notification #2 @@ -401,7 +392,7 @@ mod tests { } let tx = SystemTransaction::new_account(&alice, witness.pubkey(), 1, last_id, 0); - process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); let tx = BudgetTransaction::new_signature( &witness, @@ -409,7 +400,7 @@ mod tests { bob_pubkey, last_id, ); - process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); + let arc_bank = process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap(); sleep(Duration::from_millis(200)); let expected_userdata = arc_bank diff --git a/src/rpc_service.rs b/src/rpc_service.rs index 5f9a796aad..2af5e2e0ed 100644 --- a/src/rpc_service.rs +++ b/src/rpc_service.rs @@ -5,7 +5,6 @@ use crate::cluster_info::ClusterInfo; use crate::rpc::*; use crate::service::Service; use crate::storage_stage::StorageState; -use bs58; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; use std::net::SocketAddr; diff --git a/src/rpc_subscriptions.rs b/src/rpc_subscriptions.rs index 88ce743eb1..0ba9be271d 100644 --- a/src/rpc_subscriptions.rs +++ b/src/rpc_subscriptions.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::bank::{self, BankError}; +use crate::bank::{self, Bank, BankError}; use crate::rpc_status::RpcSignatureStatus; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::typed::Sink; @@ -116,12 +116,34 @@ impl RpcSubscriptions { }); found } + + /// Notify subscribers of changes to any accounts or new signatures since + /// the bank's last checkpoint. + pub fn notify_subscribers(&self, bank: &Bank) { + let pubkeys: Vec<_> = { + let subs = self.account_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for pubkey in &pubkeys { + if let Some(account) = &bank.get_account_modified_since_parent(pubkey) { + self.check_account(pubkey, account); + } + } + + let signatures: Vec<_> = { + let subs = self.signature_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for signature in &signatures { + let status = bank.get_signature_status(signature).unwrap(); + self.check_signature(signature, &status); + } + } } #[cfg(test)] mod tests { use super::*; - use crate::bank::Bank; use crate::genesis_block::GenesisBlock; use jsonrpc_pubsub::typed::Subscriber; use solana_sdk::budget_program; @@ -155,7 +177,7 @@ mod tests { assert!(subscriptions .account_subscriptions - .write() + .read() .unwrap() .contains_key(&alice.pubkey())); @@ -170,7 +192,7 @@ mod tests { subscriptions.remove_account_subscription(&sub_id); assert!(!subscriptions .account_subscriptions - .write() + .read() .unwrap() .contains_key(&alice.pubkey())); } @@ -193,7 +215,7 @@ mod tests { assert!(subscriptions .signature_subscriptions - .write() + .read() .unwrap() .contains_key(&signature)); @@ -207,7 +229,7 @@ mod tests { subscriptions.remove_signature_subscription(&sub_id); assert!(!subscriptions .signature_subscriptions - .write() + .read() .unwrap() .contains_key(&signature)); }