Pull RpcSubscriptions out of the Bank

This commit is contained in:
Greg Fitzgerald 2019-02-17 16:33:25 -07:00
parent a444cac2aa
commit 377d45c9dd
3 changed files with 36 additions and 81 deletions

View File

@ -8,7 +8,6 @@ use crate::counter::Counter;
use crate::genesis_block::GenesisBlock;
use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
use crate::poh_service::NUM_TICKS_PER_SECOND;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::status_cache::StatusCache;
use bincode::{deserialize, serialize};
use log::Level;
@ -89,8 +88,6 @@ pub struct Bank {
/// FIFO queue of `last_id` items
last_id_queue: RwLock<LastIdQueue>,
subscriptions: RwLock<Option<Arc<RpcSubscriptions>>>,
parent: Option<Arc<Bank>>,
parent_hash: Hash,
@ -102,7 +99,6 @@ impl Default for Bank {
accounts: Accounts::default(),
last_id_queue: RwLock::new(LastIdQueue::default()),
status_cache: RwLock::new(BankStatusCache::default()),
subscriptions: RwLock::new(None),
parent: None,
parent_hash: Hash::default(),
}
@ -131,11 +127,6 @@ impl Bank {
self.parent.clone()
}
pub fn set_subscriptions(&self, subscriptions: Arc<RpcSubscriptions>) {
let mut sub = self.subscriptions.write().unwrap();
*sub = Some(subscriptions)
}
fn process_genesis_block(&self, genesis_block: &GenesisBlock) {
assert!(genesis_block.mint_id != Pubkey::default());
assert!(genesis_block.bootstrap_leader_id != Pubkey::default());
@ -258,13 +249,6 @@ impl Bank {
self.status_cache.write().unwrap().clear();
}
fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) {
for (i, tx) in txs.iter().enumerate() {
if let Some(ref subs) = *self.subscriptions.read().unwrap() {
subs.check_signature(&tx.signatures[0], &res[i]);
}
}
}
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
let mut status_cache = self.status_cache.write().unwrap();
for (i, tx) in txs.iter().enumerate() {
@ -491,9 +475,6 @@ impl Bank {
self.accounts
.store_accounts(true, txs, executed, loaded_accounts);
// Check account subscriptions and send notifications
self.send_account_notifications(txs, executed, loaded_accounts);
// once committed there is no way to unroll
let write_elapsed = now.elapsed();
debug!(
@ -502,7 +483,6 @@ impl Bank {
txs.len(),
);
self.update_transaction_statuses(txs, &executed);
self.update_subscriptions(txs, &executed);
}
/// Process a batch of transactions.
@ -609,27 +589,6 @@ impl Bank {
extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap())
}
fn send_account_notifications(
&self,
txs: &[Transaction],
res: &[Result<()>],
loaded: &[Result<(InstructionAccounts, InstructionLoaders)>],
) {
for (i, raccs) in loaded.iter().enumerate() {
if res[i].is_err() || raccs.is_err() {
continue;
}
let tx = &txs[i];
let accs = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) {
if let Some(ref subs) = *self.subscriptions.read().unwrap() {
subs.check_account(&key, account)
}
}
}
}
pub fn vote_states<F>(&self, cond: F) -> Vec<VoteState>
where
F: Fn(&VoteState) -> bool,

View File

@ -186,6 +186,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
#[cfg(test)]
mod tests {
use super::*;
use crate::bank;
use crate::genesis_block::GenesisBlock;
use jsonrpc_core::futures::sync::mpsc;
use jsonrpc_core::Response;
@ -194,10 +195,31 @@ mod tests {
use solana_sdk::budget_transaction::BudgetTransaction;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::transaction::Transaction;
use std::thread::sleep;
use std::time::Duration;
use tokio::prelude::{Async, Stream};
pub fn process_transaction_and_notify(
bank: &Bank,
tx: &Transaction,
subscriptions: &RpcSubscriptions,
) -> bank::Result<()> {
bank.process_transaction(tx)?;
for pubkey in &tx.account_keys {
if let Some(account) = &bank.get_account(pubkey) {
subscriptions.check_account(pubkey, account);
}
}
let signature = &tx.signatures[0];
let status = bank.get_signature_status(signature).unwrap();
subscriptions.check_signature(signature, &status);
Ok(())
}
fn create_session() -> Arc<Session> {
Arc::new(Session::new(mpsc::channel(1).0))
}
@ -212,8 +234,6 @@ mod tests {
let last_id = arc_bank.last_id();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let subscriptions = rpc.subscriptions.clone();
arc_bank.set_subscriptions(subscriptions);
// Test signature subscriptions
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
@ -223,9 +243,7 @@ mod tests {
Subscriber::new_test("signatureNotification");
rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string());
arc_bank
.process_transaction(&tx)
.expect("process transaction");
process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
// Test signature confirmation notification
@ -262,11 +280,9 @@ mod tests {
let res = io.handle_request_sync(&req, session.clone());
let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#);
let expected: Response =
serde_json::from_str(&expected).expect("expected response deserialization");
let expected: Response = serde_json::from_str(&expected).unwrap();
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let result: Response = serde_json::from_str(&res.unwrap()).unwrap();
assert_eq!(expected, result);
// Test bad parameter
@ -274,11 +290,9 @@ mod tests {
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[1]}}"#);
let res = io.handle_request_sync(&req, session.clone());
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#);
let expected: Response =
serde_json::from_str(&expected).expect("expected response deserialization");
let expected: Response = serde_json::from_str(&expected).unwrap();
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let result: Response = serde_json::from_str(&res.unwrap()).unwrap();
assert_eq!(expected, result);
}
@ -296,9 +310,6 @@ mod tests {
let last_id = arc_bank.last_id();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let subscriptions = rpc.subscriptions.clone();
arc_bank.set_subscriptions(subscriptions);
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, contract_state.pubkey().to_string());
@ -312,9 +323,7 @@ mod tests {
budget_program_id,
0,
);
arc_bank
.process_transaction(&tx)
.expect("process transaction");
process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
let tx = SystemTransaction::new_program_account(
&alice,
@ -326,9 +335,7 @@ mod tests {
0,
);
arc_bank
.process_transaction(&tx)
.expect("process transaction");
process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
// Test signature confirmation notification #1
let string = receiver.poll();
@ -366,9 +373,7 @@ mod tests {
50,
last_id,
);
arc_bank
.process_transaction(&tx)
.expect("process transaction");
process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
// Test signature confirmation notification #2
@ -396,9 +401,7 @@ mod tests {
}
let tx = SystemTransaction::new_account(&alice, witness.pubkey(), 1, last_id, 0);
arc_bank
.process_transaction(&tx)
.expect("process transaction");
process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
let tx = BudgetTransaction::new_signature(
&witness,
@ -406,9 +409,7 @@ mod tests {
bob_pubkey,
last_id,
);
arc_bank
.process_transaction(&tx)
.expect("process transaction");
process_transaction_and_notify(&arc_bank, &tx, &rpc.subscriptions).unwrap();
sleep(Duration::from_millis(200));
let expected_userdata = arc_bank
@ -459,11 +460,9 @@ mod tests {
let res = io.handle_request_sync(&req, session.clone());
let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#);
let expected: Response =
serde_json::from_str(&expected).expect("expected response deserialization");
let expected: Response = serde_json::from_str(&expected).unwrap();
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let result: Response = serde_json::from_str(&res.unwrap()).unwrap();
assert_eq!(expected, result);
// Test bad parameter
@ -471,11 +470,9 @@ mod tests {
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[1]}}"#);
let res = io.handle_request_sync(&req, session.clone());
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#);
let expected: Response =
serde_json::from_str(&expected).expect("expected response deserialization");
let expected: Response = serde_json::from_str(&expected).unwrap();
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let result: Response = serde_json::from_str(&res.unwrap()).unwrap();
assert_eq!(expected, result);
}
}

View File

@ -30,7 +30,6 @@ impl PubSubService {
info!("rpc_pubsub bound to {:?}", pubsub_addr);
let subscriptions = Arc::new(RpcSubscriptions::default());
let rpc = RpcSolPubSubImpl::new_with_subscriptions(bank.clone(), subscriptions.clone());
bank.set_subscriptions(subscriptions);
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let thread_hdl = Builder::new()