Delete useless wrappers

This commit is contained in:
Greg Fitzgerald 2019-02-17 13:51:12 -07:00
parent 5003e97479
commit 6d67568037
2 changed files with 33 additions and 60 deletions

View File

@ -13,7 +13,7 @@ use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use std::mem;
use std::sync::{atomic, Arc, RwLock};
use std::sync::{atomic, Arc};
#[rpc]
pub trait RpcSolPubSub {
@ -54,30 +54,25 @@ pub trait RpcSolPubSub {
fn signature_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
}
pub struct RpcPubSubBank {
pub bank: Arc<Bank>,
}
impl RpcPubSubBank {
pub fn new(bank: Arc<Bank>) -> Self {
RpcPubSubBank { bank }
}
}
pub struct RpcSolPubSubImpl {
uid: Arc<atomic::AtomicUsize>,
bank: Arc<RwLock<RpcPubSubBank>>,
pub subscription: Arc<RpcSubscriptions>,
bank: Arc<Bank>,
subscriptions: Arc<RpcSubscriptions>,
}
impl RpcSolPubSubImpl {
pub fn new(bank: Arc<RwLock<RpcPubSubBank>>) -> Self {
RpcSolPubSubImpl {
uid: Arc::new(atomic::AtomicUsize::default()),
pub fn new_with_subscriptions(bank: Arc<Bank>, subscriptions: Arc<RpcSubscriptions>) -> Self {
let uid = Arc::new(atomic::AtomicUsize::default());
Self {
uid,
bank,
subscription: Arc::new(RpcSubscriptions::default()),
subscriptions,
}
}
pub fn new(bank: Arc<Bank>) -> Self {
Self::new_with_subscriptions(bank, Arc::new(RpcSubscriptions::default()))
}
}
impl RpcSolPubSub for RpcSolPubSubImpl {
@ -107,7 +102,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
self.subscription
self.subscriptions
.add_account_subscription(&pubkey, &sub_id, &sink)
}
@ -117,7 +112,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
id: SubscriptionId,
) -> Result<bool> {
info!("account_unsubscribe: id={:?}", id);
if self.subscription.remove_account_subscription(&id) {
if self.subscriptions.remove_account_subscription(&id) {
Ok(true)
} else {
Err(Error {
@ -151,14 +146,9 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let sub_id = SubscriptionId::Number(id as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let status = self
.bank
.read()
.unwrap()
.bank
.get_signature_status(&signature);
let status = self.bank.get_signature_status(&signature);
if status.is_none() {
self.subscription
self.subscriptions
.add_signature_subscription(&signature, &sub_id, &sink);
return;
}
@ -170,7 +160,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
.unwrap();
}
_ => self
.subscription
.subscriptions
.add_signature_subscription(&signature, &sub_id, &sink),
}
}
@ -181,7 +171,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
id: SubscriptionId,
) -> Result<bool> {
info!("signature_unsubscribe");
if self.subscription.remove_signature_subscription(&id) {
if self.subscriptions.remove_signature_subscription(&id) {
Ok(true)
} else {
Err(Error {
@ -221,12 +211,11 @@ mod tests {
let arc_bank = Arc::new(bank);
let last_id = arc_bank.last_id();
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
arc_bank.set_subscriptions(subscription);
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let subscriptions = rpc.subscriptions.clone();
arc_bank.set_subscriptions(subscriptions);
// Test signature subscription
// Test signature subscriptions
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
let session = create_session();
@ -258,8 +247,7 @@ mod tests {
let session = create_session();
let mut io = PubSubHandler::default();
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
io.extend_with(rpc.to_delegate());
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
@ -307,10 +295,9 @@ mod tests {
let arc_bank = Arc::new(bank);
let last_id = arc_bank.last_id();
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
arc_bank.set_subscriptions(subscription);
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let subscriptions = rpc.subscriptions.clone();
arc_bank.set_subscriptions(subscriptions);
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
@ -457,8 +444,7 @@ mod tests {
let session = create_session();
let mut io = PubSubHandler::default();
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
io.extend_with(rpc.to_delegate());

View File

@ -1,22 +1,20 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank::Bank;
use crate::rpc_pubsub::{RpcPubSubBank, RpcSolPubSub, RpcSolPubSubImpl};
use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use jsonrpc_pubsub::{PubSubHandler, Session};
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
pub struct PubSubService {
thread_hdl: JoinHandle<()>,
exit: Arc<AtomicBool>,
rpc_bank: Arc<RwLock<RpcPubSubBank>>,
subscription: Arc<RpcSubscriptions>,
}
impl Service for PubSubService {
@ -30,10 +28,9 @@ impl Service for PubSubService {
impl PubSubService {
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
bank.set_subscriptions(subscription.clone());
let subscriptions = Arc::new(RpcSubscriptions::default());
let rpc = RpcSolPubSubImpl::new_with_subscriptions(bank.clone(), subscriptions.clone());
bank.set_subscriptions(subscriptions);
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let thread_hdl = Builder::new()
@ -62,17 +59,7 @@ impl PubSubService {
server.unwrap().close();
})
.unwrap();
PubSubService {
thread_hdl,
exit,
rpc_bank,
subscription,
}
}
pub fn set_bank(&self, bank: &Arc<Bank>) {
self.rpc_bank.write().unwrap().bank = bank.clone();
bank.set_subscriptions(self.subscription.clone());
Self { thread_hdl, exit }
}
pub fn exit(&self) {