Drop RpcPubSubService's dependency on the Bank

Pass in RpcSubscriptions instead, which let's you choose a
bank fork when it's time to send notifications.
This commit is contained in:
Greg Fitzgerald 2019-02-18 17:25:17 -07:00
parent 905b1e2775
commit 5916177dc8
3 changed files with 17 additions and 48 deletions

View File

@ -12,6 +12,7 @@ use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig};
use crate::poh_service::PohServiceConfig;
use crate::rpc_pubsub_service::PubSubService;
use crate::rpc_service::JsonRpcService;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::tpu::{Tpu, TpuRotationReceiver};
@ -168,8 +169,9 @@ impl Fullnode {
storage_state.clone(),
);
let subscriptions = Arc::new(RpcSubscriptions::default());
let rpc_pubsub_service = PubSubService::new(
&bank,
&subscriptions,
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
node.info.rpc_pubsub.port(),

View File

@ -1,10 +1,8 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank::Bank;
use crate::rpc_status::RpcSignatureStatus;
use crate::rpc_subscriptions::RpcSubscriptions;
use bs58;
use jsonrpc_core::futures::Future;
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::Subscriber;
@ -54,24 +52,16 @@ pub trait RpcSolPubSub {
fn signature_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
}
#[derive(Default)]
pub struct RpcSolPubSubImpl {
uid: Arc<atomic::AtomicUsize>,
bank: Arc<Bank>,
subscriptions: Arc<RpcSubscriptions>,
}
impl RpcSolPubSubImpl {
pub fn new_with_subscriptions(bank: Arc<Bank>, subscriptions: Arc<RpcSubscriptions>) -> Self {
pub fn new(subscriptions: Arc<RpcSubscriptions>) -> Self {
let uid = Arc::new(atomic::AtomicUsize::default());
Self {
uid,
bank,
subscriptions,
}
}
pub fn new(bank: Arc<Bank>) -> Self {
Self::new_with_subscriptions(bank, Arc::new(RpcSubscriptions::default()))
Self { uid, subscriptions }
}
}
@ -146,23 +136,8 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
let sub_id = SubscriptionId::Number(id as u64);
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let status = self.bank.get_signature_status(&signature);
if status.is_none() {
self.subscriptions
.add_signature_subscription(&signature, &sub_id, &sink);
return;
}
match status.unwrap() {
Ok(_) => {
sink.notify(Ok(RpcSignatureStatus::Confirmed))
.wait()
.unwrap();
}
_ => self
.subscriptions
.add_signature_subscription(&signature, &sub_id, &sink),
}
self.subscriptions
.add_signature_subscription(&signature, &sub_id, &sink);
}
fn signature_unsubscribe(
@ -186,7 +161,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
#[cfg(test)]
mod tests {
use super::*;
use crate::bank;
use crate::bank::{self, Bank};
use crate::genesis_block::GenesisBlock;
use jsonrpc_core::futures::sync::mpsc;
use jsonrpc_core::Response;
@ -225,7 +200,7 @@ mod tests {
let arc_bank = Arc::new(bank);
let last_id = arc_bank.last_id();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let rpc = RpcSolPubSubImpl::default();
// Test signature subscriptions
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
@ -257,7 +232,7 @@ mod tests {
let session = create_session();
let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let rpc = RpcSolPubSubImpl::default();
io.extend_with(rpc.to_delegate());
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
@ -301,7 +276,7 @@ mod tests {
let arc_bank = Arc::new(bank);
let last_id = arc_bank.last_id();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let rpc = RpcSolPubSubImpl::default();
let session = create_session();
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
rpc.account_subscribe(session, subscriber, contract_state.pubkey().to_string());
@ -428,15 +403,11 @@ mod tests {
#[test]
fn test_account_unsubscribe() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bob_pubkey = Keypair::new().pubkey();
let bank = Bank::new(&genesis_block);
let arc_bank = Arc::new(bank);
let session = create_session();
let mut io = PubSubHandler::default();
let rpc = RpcSolPubSubImpl::new(arc_bank.clone());
let rpc = RpcSolPubSubImpl::default();
io.extend_with(rpc.to_delegate());

View File

@ -1,6 +1,5 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank::Bank;
use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
@ -26,10 +25,9 @@ impl Service for PubSubService {
}
impl PubSubService {
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self {
pub fn new(subscriptions: &Arc<RpcSubscriptions>, pubsub_addr: SocketAddr) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr);
let subscriptions = Arc::new(RpcSubscriptions::default());
let rpc = RpcSolPubSubImpl::new_with_subscriptions(bank.clone(), subscriptions.clone());
let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let thread_hdl = Builder::new()
@ -74,15 +72,13 @@ impl PubSubService {
#[cfg(test)]
mod tests {
use super::*;
use crate::genesis_block::GenesisBlock;
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn test_pubsub_new() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let subscriptions = Arc::new(RpcSubscriptions::default());
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr);
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr);
let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub");
}