From 5916177dc8e39148087740b8acb451c962feaee3 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 18 Feb 2019 17:25:17 -0700 Subject: [PATCH] Drop RpcPubSubService's dependency on the Bank Pass in RpcSubscriptions instead, which let's you choose a bank fork when it's time to send notifications. --- src/fullnode.rs | 4 +++- src/rpc_pubsub.rs | 49 ++++++++------------------------------- src/rpc_pubsub_service.rs | 12 ++++------ 3 files changed, 17 insertions(+), 48 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index 4f1791a214..9eb4a18432 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -12,6 +12,7 @@ use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig}; use crate::poh_service::PohServiceConfig; use crate::rpc_pubsub_service::PubSubService; use crate::rpc_service::JsonRpcService; +use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::StorageState; use crate::tpu::{Tpu, TpuRotationReceiver}; @@ -168,8 +169,9 @@ impl Fullnode { storage_state.clone(), ); + let subscriptions = Arc::new(RpcSubscriptions::default()); let rpc_pubsub_service = PubSubService::new( - &bank, + &subscriptions, SocketAddr::new( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc_pubsub.port(), diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 00a6306741..e31dfdb3b7 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -1,10 +1,8 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::bank::Bank; use crate::rpc_status::RpcSignatureStatus; use crate::rpc_subscriptions::RpcSubscriptions; use bs58; -use jsonrpc_core::futures::Future; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::typed::Subscriber; @@ -54,24 +52,16 @@ pub trait RpcSolPubSub { fn signature_unsubscribe(&self, _: Option, _: SubscriptionId) -> Result; } +#[derive(Default)] pub struct RpcSolPubSubImpl { uid: Arc, - bank: Arc, subscriptions: Arc, } impl RpcSolPubSubImpl { - pub fn new_with_subscriptions(bank: Arc, subscriptions: Arc) -> Self { + pub fn new(subscriptions: Arc) -> Self { let uid = Arc::new(atomic::AtomicUsize::default()); - Self { - uid, - bank, - subscriptions, - } - } - - pub fn new(bank: Arc) -> Self { - Self::new_with_subscriptions(bank, Arc::new(RpcSubscriptions::default())) + Self { uid, subscriptions } } } @@ -146,23 +136,8 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let sub_id = SubscriptionId::Number(id as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let status = self.bank.get_signature_status(&signature); - if status.is_none() { - self.subscriptions - .add_signature_subscription(&signature, &sub_id, &sink); - return; - } - - match status.unwrap() { - Ok(_) => { - sink.notify(Ok(RpcSignatureStatus::Confirmed)) - .wait() - .unwrap(); - } - _ => self - .subscriptions - .add_signature_subscription(&signature, &sub_id, &sink), - } + self.subscriptions + .add_signature_subscription(&signature, &sub_id, &sink); } fn signature_unsubscribe( @@ -186,7 +161,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { #[cfg(test)] mod tests { use super::*; - use crate::bank; + use crate::bank::{self, Bank}; use crate::genesis_block::GenesisBlock; use jsonrpc_core::futures::sync::mpsc; use jsonrpc_core::Response; @@ -225,7 +200,7 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc = RpcSolPubSubImpl::default(); // Test signature subscriptions let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); @@ -257,7 +232,7 @@ mod tests { let session = create_session(); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc = RpcSolPubSubImpl::default(); io.extend_with(rpc.to_delegate()); let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); @@ -301,7 +276,7 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc = RpcSolPubSubImpl::default(); let session = create_session(); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, contract_state.pubkey().to_string()); @@ -428,15 +403,11 @@ mod tests { #[test] fn test_account_unsubscribe() { - let (genesis_block, _) = GenesisBlock::new(10_000); let bob_pubkey = Keypair::new().pubkey(); - let bank = Bank::new(&genesis_block); - let arc_bank = Arc::new(bank); - let session = create_session(); let mut io = PubSubHandler::default(); - let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let rpc = RpcSolPubSubImpl::default(); io.extend_with(rpc.to_delegate()); diff --git a/src/rpc_pubsub_service.rs b/src/rpc_pubsub_service.rs index d846f1ad10..672760e202 100644 --- a/src/rpc_pubsub_service.rs +++ b/src/rpc_pubsub_service.rs @@ -1,6 +1,5 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::bank::Bank; use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; @@ -26,10 +25,9 @@ impl Service for PubSubService { } impl PubSubService { - pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { + pub fn new(subscriptions: &Arc, pubsub_addr: SocketAddr) -> Self { info!("rpc_pubsub bound to {:?}", pubsub_addr); - let subscriptions = Arc::new(RpcSubscriptions::default()); - let rpc = RpcSolPubSubImpl::new_with_subscriptions(bank.clone(), subscriptions.clone()); + let rpc = RpcSolPubSubImpl::new(subscriptions.clone()); let exit = Arc::new(AtomicBool::new(false)); let exit_ = exit.clone(); let thread_hdl = Builder::new() @@ -74,15 +72,13 @@ impl PubSubService { #[cfg(test)] mod tests { use super::*; - use crate::genesis_block::GenesisBlock; use std::net::{IpAddr, Ipv4Addr}; #[test] fn test_pubsub_new() { - let (genesis_block, _) = GenesisBlock::new(10_000); - let bank = Bank::new(&genesis_block); + let subscriptions = Arc::new(RpcSubscriptions::default()); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr); + let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); }