Move subscriptions to rpc_pubsub (#2490)

* Move subscriptions to rpc_pubsub

- this helps avoid recreating pubsub_service on node's role change

* fixed tests and addressed review comments

* fix clippy errors

* address review comments
This commit is contained in:
Pankaj Garg 2019-01-21 09:59:09 -08:00 committed by GitHub
parent abbb037888
commit 6611188edf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 291 additions and 252 deletions

View File

@ -8,16 +8,13 @@ use crate::checkpoint::Checkpoint;
use crate::counter::Counter; use crate::counter::Counter;
use crate::entry::Entry; use crate::entry::Entry;
use crate::entry::EntrySlice; use crate::entry::EntrySlice;
use crate::jsonrpc_macros::pubsub::Sink;
use crate::leader_scheduler::LeaderScheduler; use crate::leader_scheduler::LeaderScheduler;
use crate::mint::Mint; use crate::mint::Mint;
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::rpc::RpcSignatureStatus;
use crate::runtime::{self, RuntimeError}; use crate::runtime::{self, RuntimeError};
use crate::status_deque::{Status, StatusDeque, MAX_ENTRY_IDS}; use crate::status_deque::{Status, StatusDeque, MAX_ENTRY_IDS};
use crate::storage_stage::StorageState; use crate::storage_stage::StorageState;
use bincode::deserialize; use bincode::deserialize;
use hashbrown::HashMap;
use itertools::Itertools; use itertools::Itertools;
use log::Level; use log::Level;
use rayon::prelude::*; use rayon::prelude::*;
@ -44,7 +41,6 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Instant; use std::time::Instant;
use tokio::prelude::Future;
/// Reasons a transaction might be rejected. /// Reasons a transaction might be rejected.
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
@ -87,6 +83,23 @@ pub type Result<T> = result::Result<T, BankError>;
pub const VERIFY_BLOCK_SIZE: usize = 16; pub const VERIFY_BLOCK_SIZE: usize = 16;
pub trait BankSubscriptions {
fn check_account(&self, pubkey: &Pubkey, account: &Account);
fn check_signature(&self, signature: &Signature, status: &Result<()>);
}
struct LocalSubscriptions {}
impl Default for LocalSubscriptions {
fn default() -> Self {
LocalSubscriptions {}
}
}
impl BankSubscriptions for LocalSubscriptions {
fn check_account(&self, _pubkey: &Pubkey, _account: &Account) {}
fn check_signature(&self, _signature: &Signature, _status: &Result<()>) {}
}
/// Manager for the state of all accounts and programs after processing its entries. /// Manager for the state of all accounts and programs after processing its entries.
pub struct Bank { pub struct Bank {
pub accounts: Accounts, pub accounts: Accounts,
@ -97,17 +110,13 @@ pub struct Bank {
// The latest confirmation time for the network // The latest confirmation time for the network
confirmation_time: AtomicUsize, confirmation_time: AtomicUsize,
// Mapping of account ids to Subscriber ids and sinks to notify on userdata update
account_subscriptions: RwLock<HashMap<Pubkey, HashMap<Pubkey, Sink<Account>>>>,
// Mapping of signatures to Subscriber ids and sinks to notify on confirmation
signature_subscriptions: RwLock<HashMap<Signature, HashMap<Pubkey, Sink<RpcSignatureStatus>>>>,
/// Tracks and updates the leader schedule based on the votes and account stakes /// Tracks and updates the leader schedule based on the votes and account stakes
/// processed by the bank /// processed by the bank
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>, pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
pub storage_state: StorageState, pub storage_state: StorageState,
subscriptions: RwLock<Box<Arc<BankSubscriptions + Send + Sync>>>,
} }
impl Default for Bank { impl Default for Bank {
@ -116,10 +125,9 @@ impl Default for Bank {
accounts: Accounts::default(), accounts: Accounts::default(),
last_ids: RwLock::new(StatusDeque::default()), last_ids: RwLock::new(StatusDeque::default()),
confirmation_time: AtomicUsize::new(std::usize::MAX), confirmation_time: AtomicUsize::new(std::usize::MAX),
account_subscriptions: RwLock::new(HashMap::new()),
signature_subscriptions: RwLock::new(HashMap::new()),
leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
storage_state: StorageState::new(), storage_state: StorageState::new(),
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
} }
} }
} }
@ -145,6 +153,11 @@ impl Bank {
bank bank
} }
pub fn set_subscriptions(&self, subscriptions: Box<Arc<BankSubscriptions + Send + Sync>>) {
let mut sub = self.subscriptions.write().unwrap();
*sub = subscriptions
}
pub fn checkpoint(&self) { pub fn checkpoint(&self) {
self.accounts.checkpoint(); self.accounts.checkpoint();
self.last_ids.write().unwrap().checkpoint(); self.last_ids.write().unwrap().checkpoint();
@ -160,7 +173,10 @@ impl Bank {
rolled_back_pubkeys.iter().for_each(|pubkey| { rolled_back_pubkeys.iter().for_each(|pubkey| {
if let Some(account) = self.accounts.load_slow(&pubkey) { if let Some(account) = self.accounts.load_slow(&pubkey) {
self.check_account_subscriptions(&pubkey, &account) self.subscriptions
.read()
.unwrap()
.check_account(&pubkey, &account)
} }
}); });
@ -330,15 +346,10 @@ impl Bank {
let mut last_ids = self.last_ids.write().unwrap(); let mut last_ids = self.last_ids.write().unwrap();
for (i, tx) in txs.iter().enumerate() { for (i, tx) in txs.iter().enumerate() {
last_ids.update_signature_status_with_last_id(&tx.signatures[0], &res[i], &tx.last_id); last_ids.update_signature_status_with_last_id(&tx.signatures[0], &res[i], &tx.last_id);
let status = match res[i] { self.subscriptions
Ok(_) => RpcSignatureStatus::Confirmed, .read()
Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse, .unwrap()
Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError, .check_signature(&tx.signatures[0], &res[i]);
Err(_) => RpcSignatureStatus::GenericFailure,
};
if status != RpcSignatureStatus::SignatureNotFound {
self.check_signature_subscriptions(&tx.signatures[0], status);
}
} }
} }
@ -887,41 +898,14 @@ impl Bank {
let tx = &txs[i]; let tx = &txs[i];
let accs = raccs.as_ref().unwrap(); let accs = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) {
self.check_account_subscriptions(&key, account); self.subscriptions
.read()
.unwrap()
.check_account(&key, account);
} }
} }
} }
pub fn add_account_subscription(
&self,
bank_sub_id: Pubkey,
pubkey: Pubkey,
sink: Sink<Account>,
) {
let mut subscriptions = self.account_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(&pubkey) {
current_hashmap.insert(bank_sub_id, sink);
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(bank_sub_id, sink);
subscriptions.insert(pubkey, hashmap);
}
pub fn remove_account_subscription(&self, bank_sub_id: &Pubkey, pubkey: &Pubkey) -> bool {
let mut subscriptions = self.account_subscriptions.write().unwrap();
match subscriptions.get_mut(pubkey) {
Some(ref current_hashmap) if current_hashmap.len() == 1 => {}
Some(current_hashmap) => {
return current_hashmap.remove(bank_sub_id).is_some();
}
None => {
return false;
}
}
subscriptions.remove(pubkey).is_some()
}
pub fn get_current_leader(&self) -> Option<(Pubkey, u64)> { pub fn get_current_leader(&self) -> Option<(Pubkey, u64)> {
self.leader_scheduler self.leader_scheduler
.read() .read()
@ -932,66 +916,12 @@ impl Bank {
pub fn tick_height(&self) -> u64 { pub fn tick_height(&self) -> u64 {
self.last_ids.read().unwrap().tick_height self.last_ids.read().unwrap().tick_height
} }
fn check_account_subscriptions(&self, pubkey: &Pubkey, account: &Account) {
let subscriptions = self.account_subscriptions.read().unwrap();
if let Some(hashmap) = subscriptions.get(pubkey) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(account.clone())).wait().unwrap();
}
}
}
pub fn add_signature_subscription(
&self,
bank_sub_id: Pubkey,
signature: Signature,
sink: Sink<RpcSignatureStatus>,
) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(&signature) {
current_hashmap.insert(bank_sub_id, sink);
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(bank_sub_id, sink);
subscriptions.insert(signature, hashmap);
}
pub fn remove_signature_subscription(
&self,
bank_sub_id: &Pubkey,
signature: &Signature,
) -> bool {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
match subscriptions.get_mut(signature) {
Some(ref current_hashmap) if current_hashmap.len() == 1 => {}
Some(current_hashmap) => {
return current_hashmap.remove(bank_sub_id).is_some();
}
None => {
return false;
}
}
subscriptions.remove(signature).is_some()
}
fn check_signature_subscriptions(&self, signature: &Signature, status: RpcSignatureStatus) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(hashmap) = subscriptions.get(signature) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(status)).wait().unwrap();
}
}
subscriptions.remove(&signature);
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::entry::{next_entries, next_entry, Entry}; use crate::entry::{next_entries, next_entry, Entry};
use crate::jsonrpc_macros::pubsub::{Subscriber, SubscriptionId};
use crate::signature::GenKeys; use crate::signature::GenKeys;
use crate::status_deque; use crate::status_deque;
use crate::status_deque::StatusDequeError; use crate::status_deque::StatusDequeError;
@ -1006,7 +936,6 @@ mod tests {
use solana_sdk::transaction::Instruction; use solana_sdk::transaction::Instruction;
use std; use std;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use tokio::prelude::{Async, Stream};
#[test] #[test]
fn test_bank_new() { fn test_bank_new() {
@ -1447,90 +1376,7 @@ mod tests {
Ok(_) Ok(_)
); );
} }
#[test]
fn test_bank_account_subscribe() {
let mint = Mint::new(100);
let bank = Bank::new(&mint);
let alice = Keypair::new();
let bank_sub_id = Keypair::new().pubkey();
let last_id = bank.last_id();
let tx = Transaction::system_create(
&mint.keypair(),
alice.pubkey(),
last_id,
1,
16,
budget_program::id(),
0,
);
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
bank.add_account_subscription(bank_sub_id, alice.pubkey(), sink);
assert!(bank
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
let account = bank.get_account(&alice.pubkey()).unwrap();
bank.check_account_subscriptions(&alice.pubkey(), &account);
let string = transport_receiver.poll();
assert!(string.is_ok());
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"loader":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#);
assert_eq!(expected, response);
}
bank.remove_account_subscription(&bank_sub_id, &alice.pubkey());
assert!(!bank
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
}
#[test]
fn test_bank_signature_subscribe() {
let mint = Mint::new(100);
let bank = Bank::new(&mint);
let alice = Keypair::new();
let bank_sub_id = Keypair::new().pubkey();
let last_id = bank.last_id();
let tx = Transaction::system_move(&mint.keypair(), alice.pubkey(), 20, last_id, 0);
let signature = tx.signatures[0];
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
bank.add_signature_subscription(bank_sub_id, signature, sink);
assert!(bank
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
bank.check_signature_subscriptions(&signature, RpcSignatureStatus::Confirmed);
let string = transport_receiver.poll();
assert!(string.is_ok());
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#);
assert_eq!(expected, response);
}
bank.remove_signature_subscription(&bank_sub_id, &signature);
assert!(!bank
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
}
#[test] #[test]
fn test_first_err() { fn test_first_err() {
assert_eq!(Bank::first_err(&[Ok(())]), Ok(())); assert_eq!(Bank::first_err(&[Ok(())]), Ok(()));

View File

@ -104,7 +104,6 @@ pub struct Fullnode {
retransmit_socket: UdpSocket, retransmit_socket: UdpSocket,
tpu_sockets: Vec<UdpSocket>, tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
rpc_pubsub_addr: SocketAddr,
db_ledger: Arc<DbLedger>, db_ledger: Arc<DbLedger>,
vote_signer: Arc<VoteSignerProxy>, vote_signer: Arc<VoteSignerProxy>,
} }
@ -370,7 +369,6 @@ impl Fullnode {
retransmit_socket: node.sockets.retransmit, retransmit_socket: node.sockets.retransmit,
tpu_sockets: node.sockets.tpu, tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast, broadcast_socket: node.sockets.broadcast,
rpc_pubsub_addr,
db_ledger, db_ledger,
vote_signer, vote_signer,
} }
@ -379,12 +377,6 @@ impl Fullnode {
fn leader_to_validator(&mut self) -> Result<()> { fn leader_to_validator(&mut self) -> Result<()> {
trace!("leader_to_validator"); trace!("leader_to_validator");
// Close down any services that could have a reference to the bank
if self.rpc_pubsub_service.is_some() {
let old_rpc_pubsub_service = self.rpc_pubsub_service.take().unwrap();
old_rpc_pubsub_service.close()?;
}
// Correctness check: Ensure that references to the bank and leader scheduler are no // Correctness check: Ensure that references to the bank and leader scheduler are no
// longer held by any running thread // longer held by any running thread
let mut new_leader_scheduler = self.bank.leader_scheduler.read().unwrap().clone(); let mut new_leader_scheduler = self.bank.leader_scheduler.read().unwrap().clone();
@ -418,15 +410,9 @@ impl Fullnode {
rpc_service.set_bank(&new_bank); rpc_service.set_bank(&new_bank);
} }
// TODO: Don't restart PubSubService on leader rotation if let Some(ref mut rpc_pubsub_service) = self.rpc_pubsub_service {
// See https://github.com/solana-labs/solana/issues/2419 rpc_pubsub_service.set_bank(&new_bank);
self.rpc_pubsub_service = Some(PubSubService::new( }
&new_bank,
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
self.rpc_pubsub_addr.port(),
),
));
self.bank = new_bank; self.bank = new_bank;

View File

@ -1,9 +1,11 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request //! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank::Bank; use crate::bank;
use crate::bank::{Bank, BankError, BankSubscriptions};
use crate::jsonrpc_core::futures::Future; use crate::jsonrpc_core::futures::Future;
use crate::jsonrpc_core::*; use crate::jsonrpc_core::*;
use crate::jsonrpc_macros::pubsub; use crate::jsonrpc_macros::pubsub;
use crate::jsonrpc_macros::pubsub::Sink;
use crate::jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId}; use crate::jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId};
use crate::jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder}; use crate::jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder};
use crate::rpc::RpcSignatureStatus; use crate::rpc::RpcSignatureStatus;
@ -12,7 +14,7 @@ use crate::status_deque::Status;
use bs58; use bs58;
use solana_sdk::account::Account; use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::signature::Signature;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -29,6 +31,8 @@ pub enum ClientState {
pub struct PubSubService { pub struct PubSubService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
rpc_bank: Arc<RwLock<RpcPubSubBank>>,
subscription: Arc<RpcSubscriptions>,
} }
impl Service for PubSubService { impl Service for PubSubService {
@ -42,7 +46,10 @@ impl Service for PubSubService {
impl PubSubService { impl PubSubService {
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self { pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr); info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc = RpcSolPubSubImpl::new(bank.clone()); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
bank.set_subscriptions(Box::new(subscription.clone()));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone(); let exit_ = exit.clone();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
@ -71,7 +78,17 @@ impl PubSubService {
server.unwrap().close(); server.unwrap().close();
}) })
.unwrap(); .unwrap();
PubSubService { thread_hdl, exit } PubSubService {
thread_hdl,
exit,
rpc_bank,
subscription,
}
}
pub fn set_bank(&self, bank: &Arc<Bank>) {
self.rpc_bank.write().unwrap().bank = bank.clone();
bank.set_subscriptions(Box::new(self.subscription.clone()));
} }
pub fn exit(&self) { pub fn exit(&self) {
@ -111,20 +128,135 @@ build_rpc_trait! {
} }
} }
struct RpcPubSubBank {
bank: Arc<Bank>,
}
impl RpcPubSubBank {
pub fn new(bank: Arc<Bank>) -> Self {
RpcPubSubBank { bank }
}
}
pub struct RpcSubscriptions {
account_subscriptions: RwLock<HashMap<Pubkey, HashMap<SubscriptionId, Sink<Account>>>>,
signature_subscriptions:
RwLock<HashMap<Signature, HashMap<SubscriptionId, Sink<RpcSignatureStatus>>>>,
}
impl Default for RpcSubscriptions {
fn default() -> Self {
RpcSubscriptions {
account_subscriptions: Default::default(),
signature_subscriptions: Default::default(),
}
}
}
impl BankSubscriptions for RpcSubscriptions {
fn check_account(&self, pubkey: &Pubkey, account: &Account) {
let subscriptions = self.account_subscriptions.read().unwrap();
if let Some(hashmap) = subscriptions.get(pubkey) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(account.clone())).wait().unwrap();
}
}
}
fn check_signature(&self, signature: &Signature, bank_error: &bank::Result<()>) {
let status = match bank_error {
Ok(_) => RpcSignatureStatus::Confirmed,
Err(BankError::AccountInUse) => RpcSignatureStatus::AccountInUse,
Err(BankError::ProgramError(_, _)) => RpcSignatureStatus::ProgramRuntimeError,
Err(_) => RpcSignatureStatus::GenericFailure,
};
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(hashmap) = subscriptions.get(signature) {
for (_bank_sub_id, sink) in hashmap.iter() {
sink.notify(Ok(status)).wait().unwrap();
}
}
subscriptions.remove(&signature);
}
}
impl RpcSubscriptions {
pub fn add_account_subscription(
&self,
pubkey: &Pubkey,
sub_id: &SubscriptionId,
sink: &Sink<Account>,
) {
let mut subscriptions = self.account_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(pubkey) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*pubkey, hashmap);
}
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.account_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
}
pub fn add_signature_subscription(
&self,
signature: &Signature,
sub_id: &SubscriptionId,
sink: &Sink<RpcSignatureStatus>,
) {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
if let Some(current_hashmap) = subscriptions.get_mut(signature) {
current_hashmap.insert(sub_id.clone(), sink.clone());
return;
}
let mut hashmap = HashMap::new();
hashmap.insert(sub_id.clone(), sink.clone());
subscriptions.insert(*signature, hashmap);
}
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.signature_subscriptions.write().unwrap();
let mut found = false;
subscriptions.retain(|_, v| {
v.retain(|k, _| {
if *k == *id {
found = true;
}
!found
});
!v.is_empty()
});
found
}
}
struct RpcSolPubSubImpl { struct RpcSolPubSubImpl {
uid: Arc<atomic::AtomicUsize>, uid: Arc<atomic::AtomicUsize>,
bank: Arc<Bank>, bank: Arc<RwLock<RpcPubSubBank>>,
account_subscriptions: Arc<RwLock<HashMap<SubscriptionId, (Pubkey, Pubkey)>>>, subscription: Arc<RpcSubscriptions>,
signature_subscriptions: Arc<RwLock<HashMap<SubscriptionId, (Pubkey, Signature)>>>,
} }
impl RpcSolPubSubImpl { impl RpcSolPubSubImpl {
fn new(bank: Arc<Bank>) -> Self { fn new(bank: Arc<RwLock<RpcPubSubBank>>) -> Self {
RpcSolPubSubImpl { RpcSolPubSubImpl {
uid: Default::default(), uid: Default::default(),
bank, bank,
account_subscriptions: Default::default(), subscription: Arc::new(Default::default()),
signature_subscriptions: Default::default(),
} }
} }
} }
@ -155,21 +287,14 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let bank_sub_id = Keypair::new().pubkey();
self.account_subscriptions
.write()
.unwrap()
.insert(sub_id.clone(), (bank_sub_id, pubkey));
self.bank self.subscription
.add_account_subscription(bank_sub_id, pubkey, sink); .add_account_subscription(&pubkey, &sub_id, &sink)
} }
fn account_unsubscribe(&self, id: SubscriptionId) -> Result<bool> { fn account_unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
info!("account_unsubscribe: id={:?}", id); info!("account_unsubscribe: id={:?}", id);
if let Some((bank_sub_id, pubkey)) = self.account_subscriptions.write().unwrap().remove(&id) if self.subscription.remove_account_subscription(&id) {
{
self.bank.remove_account_subscription(&bank_sub_id, &pubkey);
Ok(true) Ok(true)
} else { } else {
Err(Error { Err(Error {
@ -199,18 +324,19 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
return; return;
} }
let signature = Signature::new(&signature_vec); let signature = Signature::new(&signature_vec);
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64); let sub_id = SubscriptionId::Number(id as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let bank_sub_id = Keypair::new().pubkey();
self.signature_subscriptions
.write()
.unwrap()
.insert(sub_id.clone(), (bank_sub_id, signature));
let status = self.bank.get_signature_status(&signature); let status = self
.bank
.read()
.unwrap()
.bank
.get_signature_status(&signature);
if status.is_none() { if status.is_none() {
self.subscription
.add_signature_subscription(&signature, &sub_id, &sink);
return; return;
} }
@ -219,25 +345,16 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
sink.notify(Ok(RpcSignatureStatus::Confirmed)) sink.notify(Ok(RpcSignatureStatus::Confirmed))
.wait() .wait()
.unwrap(); .unwrap();
self.signature_subscriptions
.write()
.unwrap()
.remove(&sub_id);
}
_ => {
self.bank
.add_signature_subscription(bank_sub_id, signature, sink);
} }
_ => self
.subscription
.add_signature_subscription(&signature, &sub_id, &sink),
} }
} }
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool> { fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
info!("signature_unsubscribe"); info!("signature_unsubscribe");
if let Some((bank_sub_id, signature)) = if self.subscription.remove_signature_subscription(&id) {
self.signature_subscriptions.write().unwrap().remove(&id)
{
self.bank
.remove_signature_subscription(&bank_sub_id, &signature);
Ok(true) Ok(true)
} else { } else {
Err(Error { Err(Error {
@ -253,6 +370,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
mod tests { mod tests {
use super::*; use super::*;
use crate::jsonrpc_core::futures::sync::mpsc; use crate::jsonrpc_core::futures::sync::mpsc;
use crate::jsonrpc_macros::pubsub::{Subscriber, SubscriptionId};
use crate::mint::Mint; use crate::mint::Mint;
use solana_sdk::budget_program; use solana_sdk::budget_program;
use solana_sdk::budget_transaction::BudgetTransaction; use solana_sdk::budget_transaction::BudgetTransaction;
@ -286,7 +404,8 @@ mod tests {
let session = Arc::new(Session::new(sender)); let session = Arc::new(Session::new(sender));
let mut io = PubSubHandler::default(); let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
// Test signature subscription // Test signature subscription
@ -359,7 +478,8 @@ mod tests {
let session = Arc::new(Session::new(sender)); let session = Arc::new(Session::new(sender));
let mut io = PubSubHandler::default(); let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0);
@ -413,7 +533,8 @@ mod tests {
let session = Arc::new(Session::new(sender)); let session = Arc::new(Session::new(sender));
let mut io = PubSubHandler::default(); let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
let req = format!( let req = format!(
@ -591,7 +712,8 @@ mod tests {
let session = Arc::new(Session::new(sender)); let session = Arc::new(Session::new(sender));
let mut io = PubSubHandler::default(); let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
io.extend_with(rpc.to_delegate()); io.extend_with(rpc.to_delegate());
@ -625,4 +747,89 @@ mod tests {
.expect("actual response deserialization"); .expect("actual response deserialization");
assert_eq!(expected, result); assert_eq!(expected, result);
} }
#[test]
fn test_check_account_subscribe() {
let mint = Mint::new(100);
let bank = Bank::new(&mint);
let alice = Keypair::new();
let last_id = bank.last_id();
let tx = Transaction::system_create(
&mint.keypair(),
alice.pubkey(),
last_id,
1,
16,
budget_program::id(),
0,
);
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("accountNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_account_subscription(&alice.pubkey(), &sub_id, &sink);
assert!(subscriptions
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
let account = bank.get_account(&alice.pubkey()).unwrap();
subscriptions.check_account(&alice.pubkey(), &account);
let string = transport_receiver.poll();
assert!(string.is_ok());
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"executable":false,"loader":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"owner":[129,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#);
assert_eq!(expected, response);
}
subscriptions.remove_account_subscription(&sub_id);
assert!(!subscriptions
.account_subscriptions
.write()
.unwrap()
.contains_key(&alice.pubkey()));
}
#[test]
fn test_check_signature_subscribe() {
let mint = Mint::new(100);
let bank = Bank::new(&mint);
let alice = Keypair::new();
let last_id = bank.last_id();
let tx = Transaction::system_move(&mint.keypair(), alice.pubkey(), 20, last_id, 0);
let signature = tx.signatures[0];
bank.process_transaction(&tx).unwrap();
let (subscriber, _id_receiver, mut transport_receiver) =
Subscriber::new_test("signatureNotification");
let sub_id = SubscriptionId::Number(0 as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let subscriptions = RpcSubscriptions::default();
subscriptions.add_signature_subscription(&signature, &sub_id, &sink);
assert!(subscriptions
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
subscriptions.check_signature(&signature, &Ok(()));
let string = transport_receiver.poll();
assert!(string.is_ok());
if let Async::Ready(Some(response)) = string.unwrap() {
let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#);
assert_eq!(expected, response);
}
subscriptions.remove_signature_subscription(&sub_id);
assert!(!subscriptions
.signature_subscriptions
.write()
.unwrap()
.contains_key(&signature));
}
} }