Move RpcSubscriptions into its own module

This commit is contained in:
Greg Fitzgerald 2019-02-17 09:38:36 -07:00
parent 9f7fc5f054
commit 50d3fa7437
4 changed files with 217 additions and 195 deletions

View File

@ -8,7 +8,7 @@ use crate::counter::Counter;
use crate::genesis_block::GenesisBlock;
use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
use crate::poh_service::NUM_TICKS_PER_SECOND;
use crate::rpc_pubsub::RpcSubscriptions;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::status_cache::StatusCache;
use bincode::deserialize;
use log::Level;

View File

@ -62,6 +62,7 @@ pub mod rpc;
pub mod rpc_mock;
pub mod rpc_pubsub;
pub mod rpc_request;
pub mod rpc_subscriptions;
pub mod service;
pub mod sigverify;
pub mod sigverify_stage;

View File

@ -1,20 +1,19 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank;
use crate::bank::{Bank, BankError};
use crate::bank::Bank;
use crate::rpc::RpcSignatureStatus;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use bs58;
use jsonrpc_core::futures::Future;
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::{Sink, Subscriber};
use jsonrpc_pubsub::typed::Subscriber;
use jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId};
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use std::collections::HashMap;
use std::mem;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
@ -144,113 +143,6 @@ impl RpcPubSubBank {
}
}
type RpcAccountSubscriptions = RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<Account>>>>;
type RpcSignatureSubscriptions =
RwLock<HashMap<Signature, HashMap<SubscriptionId, Sink<RpcSignatureStatus>>>>;
pub struct RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions,
signature_subscriptions: RpcSignatureSubscriptions,
}
impl Default for RpcSubscriptions {
fn default() -> Self {
RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions::default(),
signature_subscriptions: RpcSignatureSubscriptions::default(),
}
}
}
impl RpcSubscriptions {
pub fn check_account(&self, pubkey: &Pubkey, account: &Account) {
let subscriptions = self.account_subscriptions.read().unwrap();
if let Some(hashmap) = subscriptions.get(pubkey) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(account.clone())).wait().unwrap();
}
}
}
pub fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) {
let status = match bank_error {
Ok(_) => RpcSignatureStatus::Confirmed,
Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse,
Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError,
Err(_) => RpcSignatureStatus::GenericFailure,
};
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(hashmap) = subscriptions.get(signature) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(status)).wait().unwrap();
}
}
subscriptions.remove(&signature);
}
pub fn add_account_subscription(
&self,
pubkey: &Pubkey,
sub_id: &SubscriptionId,
sink: &Sink<Account>,
) {
let mut subscriptions = self.account_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(pubkey) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*pubkey, hashmap);
}
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.account_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
}
pub fn add_signature_subscription(
&self,
signature: &Signature,
sub_id: &SubscriptionId,
sink: &Sink<RpcSignatureStatus>,
) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(signature) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*signature, hashmap);
}
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
}
}
struct RpcSolPubSubImpl {
uid: Arc<atomic::AtomicUsize>,
bank: Arc<RwLock<RpcPubSubBank>>,
@ -695,87 +587,4 @@ mod tests {
.expect("actual response deserialization");
assert_eq!(expected, result);
}
#[test]
fn test_check_account_subscribe() {
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
let bank = Bank::new(&genesis_block);
let alice = Keypair::new();
let last_id = bank.last_id();
let tx = SystemTransaction::new_program_account(
&mint_keypair,
alice.pubkey(),
last_id,
1,
16,
budget_program::id(),
0,
);
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink);
assert!(subscriptions
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
let account = bank.get_account(&alice.pubkey()).unwrap();
subscriptions.check_account(&alice.pubkey(), &account);
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#);
assert_eq!(expected, response);
}
subscriptions.remove_account_subscription(&sub_id);
assert!(!subscriptions
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
}
#[test]
fn test_check_signature_subscribe() {
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
let bank = Bank::new(&genesis_block);
let alice = Keypair::new();
let last_id = bank.last_id();
let tx = SystemTransaction::new_move(&mint_keypair, alice.pubkey(), 20, last_id, 0);
let signature = tx.signatures[0];
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_signature_subscription(&signature, &sub_id, &sink);
assert!(subscriptions
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
subscriptions.check_signature(&signature, &Ok(()));
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#);
assert_eq!(expected, response);
}
subscriptions.remove_signature_subscription(&sub_id);
assert!(!subscriptions
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
}
}

212
src/rpc_subscriptions.rs Normal file
View File

@ -0,0 +1,212 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank::{self, Bank, BankError};
use crate::rpc::RpcSignatureStatus;
use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::typed::{Sink, Subscriber};
use jsonrpc_pubsub::SubscriptionId;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use std::collections::HashMap;
use std::sync::RwLock;
type RpcAccountSubscriptions = RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<Account>>>>;
type RpcSignatureSubscriptions =
RwLock<HashMap<Signature, HashMap<SubscriptionId, Sink<RpcSignatureStatus>>>>;
pub struct RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions,
signature_subscriptions: RpcSignatureSubscriptions,
}
impl Default for RpcSubscriptions {
fn default() -> Self {
RpcSubscriptions {
account_subscriptions: RpcAccountSubscriptions::default(),
signature_subscriptions: RpcSignatureSubscriptions::default(),
}
}
}
impl RpcSubscriptions {
pub fn check_account(&self, pubkey: &Pubkey, account: &Account) {
let subscriptions = self.account_subscriptions.read().unwrap();
if let Some(hashmap) = subscriptions.get(pubkey) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(account.clone())).wait().unwrap();
}
}
}
pub fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) {
let status = match bank_error {
Ok(_) => RpcSignatureStatus::Confirmed,
Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse,
Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError,
Err(_) => RpcSignatureStatus::GenericFailure,
};
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(hashmap) = subscriptions.get(signature) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(status)).wait().unwrap();
}
}
subscriptions.remove(&signature);
}
pub fn add_account_subscription(
&self,
pubkey: &Pubkey,
sub_id: &SubscriptionId,
sink: &Sink<Account>,
) {
let mut subscriptions = self.account_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(pubkey) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*pubkey, hashmap);
}
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.account_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
}
pub fn add_signature_subscription(
&self,
signature: &Signature,
sub_id: &SubscriptionId,
sink: &Sink<RpcSignatureStatus>,
) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(signature) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*signature, hashmap);
}
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::genesis_block::GenesisBlock;
use solana_sdk::budget_program;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use tokio::prelude::{Async, Stream};
#[test]
fn test_check_account_subscribe() {
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
let bank = Bank::new(&genesis_block);
let alice = Keypair::new();
let last_id = bank.last_id();
let tx = SystemTransaction::new_program_account(
&mint_keypair,
alice.pubkey(),
last_id,
1,
16,
budget_program::id(),
0,
);
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink);
assert!(subscriptions
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
let account = bank.get_account(&alice.pubkey()).unwrap();
subscriptions.check_account(&alice.pubkey(), &account);
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#);
assert_eq!(expected, response);
}
subscriptions.remove_account_subscription(&sub_id);
assert!(!subscriptions
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
}
#[test]
fn test_check_signature_subscribe() {
let (genesis_block, mint_keypair) = GenesisBlock::new(100);
let bank = Bank::new(&genesis_block);
let alice = Keypair::new();
let last_id = bank.last_id();
let tx = SystemTransaction::new_move(&mint_keypair, alice.pubkey(), 20, last_id, 0);
let signature = tx.signatures[0];
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_signature_subscription(&signature, &sub_id, &sink);
assert!(subscriptions
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
subscriptions.check_signature(&signature, &Ok(()));
let string = transport_receiver.poll();
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#);
assert_eq!(expected, response);
}
subscriptions.remove_signature_subscription(&sub_id);
assert!(!subscriptions
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
}
}