From eb483bc053c141977c9ee5cb2477e3770f98414a Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sun, 17 Feb 2019 10:09:46 -0700 Subject: [PATCH] Move RpcPubSubService into its own module --- src/fullnode.rs | 2 +- src/lib.rs | 1 + src/rpc_pubsub.rs | 108 +++++--------------------------------- src/rpc_pubsub_service.rs | 103 ++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 97 deletions(-) create mode 100644 src/rpc_pubsub_service.rs diff --git a/src/fullnode.rs b/src/fullnode.rs index 7b8d943723..b89e3508d6 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -11,7 +11,7 @@ use crate::gossip_service::GossipService; use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig}; use crate::poh_service::PohServiceConfig; use crate::rpc::JsonRpcService; -use crate::rpc_pubsub::PubSubService; +use crate::rpc_pubsub_service::PubSubService; use crate::service::Service; use crate::storage_stage::StorageState; use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender}; diff --git a/src/lib.rs b/src/lib.rs index 71c08aec0c..0a7a21c4d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,7 @@ pub mod retransmit_stage; pub mod rpc; pub mod rpc_mock; pub mod rpc_pubsub; +pub mod rpc_pubsub_service; pub mod rpc_request; pub mod rpc_subscriptions; pub mod service; diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index be90257171..983d070e0b 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -3,97 +3,19 @@ use crate::bank::Bank; use crate::rpc::RpcSignatureStatus; use crate::rpc_subscriptions::RpcSubscriptions; -use crate::service::Service; use bs58; use jsonrpc_core::futures::Future; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::typed::Subscriber; use jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId}; -use jsonrpc_ws_server::{RequestContext, ServerBuilder}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use std::mem; -use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{atomic, Arc, RwLock}; -use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; -pub struct PubSubService { - thread_hdl: JoinHandle<()>, - exit: Arc, - rpc_bank: Arc>, - subscription: Arc, -} - -impl Service for PubSubService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } -} - -impl PubSubService { - pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { - info!("rpc_pubsub bound to {:?}", pubsub_addr); - let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone()))); - let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); - let subscription = rpc.subscription.clone(); - bank.set_subscriptions(subscription.clone()); - let exit = Arc::new(AtomicBool::new(false)); - let exit_ = exit.clone(); - let thread_hdl = Builder::new() - .name("solana-pubsub".to_string()) - .spawn(move || { - let mut io = PubSubHandler::default(); - io.extend_with(rpc.to_delegate()); - - let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| { - info!("New pubsub connection"); - let session = Arc::new(Session::new(context.sender().clone())); - session.on_drop(|| { - info!("Pubsub connection dropped"); - }); - session - }) - .start(&pubsub_addr); - - if let Err(e) = server { - warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port()); - return; - } - while !exit_.load(Ordering::Relaxed) { - sleep(Duration::from_millis(100)); - } - server.unwrap().close(); - }) - .unwrap(); - PubSubService { - thread_hdl, - exit, - rpc_bank, - subscription, - } - } - - pub fn set_bank(&self, bank: &Arc) { - self.rpc_bank.write().unwrap().bank = bank.clone(); - bank.set_subscriptions(self.subscription.clone()); - } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } -} - #[rpc] pub trait RpcSolPubSub { type Metadata; @@ -133,8 +55,8 @@ pub trait RpcSolPubSub { fn signature_unsubscribe(&self, _: Option, _: SubscriptionId) -> Result; } -struct RpcPubSubBank { - bank: Arc, +pub struct RpcPubSubBank { + pub bank: Arc, } impl RpcPubSubBank { @@ -143,14 +65,14 @@ impl RpcPubSubBank { } } -struct RpcSolPubSubImpl { +pub struct RpcSolPubSubImpl { uid: Arc, bank: Arc>, - subscription: Arc, + pub subscription: Arc, } impl RpcSolPubSubImpl { - fn new(bank: Arc>) -> Self { + pub fn new(bank: Arc>) -> Self { RpcSolPubSubImpl { uid: Arc::new(atomic::AtomicUsize::default()), bank, @@ -158,7 +80,11 @@ impl RpcSolPubSubImpl { } } - fn subscribe_to_account_updates(&self, subscriber: Subscriber, pubkey_str: String) { + pub fn subscribe_to_account_updates( + &self, + subscriber: Subscriber, + pubkey_str: String, + ) { let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap(); if pubkey_vec.len() != mem::size_of::() { subscriber @@ -181,7 +107,7 @@ impl RpcSolPubSubImpl { .add_account_subscription(&pubkey, &sub_id, &sink) } - fn subscribe_to_signature_updates( + pub fn subscribe_to_signature_updates( &self, subscriber: Subscriber, signature_str: String, @@ -294,19 +220,9 @@ mod tests { use solana_sdk::budget_transaction::BudgetTransaction; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; - use std::net::{IpAddr, Ipv4Addr}; + use std::thread::sleep; use tokio::prelude::{Async, Stream}; - #[test] - fn test_pubsub_new() { - let (genesis_block, _) = GenesisBlock::new(10_000); - let bank = Bank::new(&genesis_block); - let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr); - let thread = pubsub_service.thread_hdl.thread(); - assert_eq!(thread.name().unwrap(), "solana-pubsub"); - } - #[test] fn test_signature_subscribe() { let (genesis_block, alice) = GenesisBlock::new(10_000); diff --git a/src/rpc_pubsub_service.rs b/src/rpc_pubsub_service.rs new file mode 100644 index 0000000000..fb110c294c --- /dev/null +++ b/src/rpc_pubsub_service.rs @@ -0,0 +1,103 @@ +//! The `pubsub` module implements a threaded subscription service on client RPC request + +use crate::bank::Bank; +use crate::rpc_pubsub::{RpcPubSubBank, RpcSolPubSub, RpcSolPubSubImpl}; +use crate::rpc_subscriptions::RpcSubscriptions; +use crate::service::Service; +use jsonrpc_pubsub::{PubSubHandler, Session}; +use jsonrpc_ws_server::{RequestContext, ServerBuilder}; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::{self, sleep, Builder, JoinHandle}; +use std::time::Duration; + +pub struct PubSubService { + thread_hdl: JoinHandle<()>, + exit: Arc, + rpc_bank: Arc>, + subscription: Arc, +} + +impl Service for PubSubService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +impl PubSubService { + pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { + info!("rpc_pubsub bound to {:?}", pubsub_addr); + let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone()))); + let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); + let subscription = rpc.subscription.clone(); + bank.set_subscriptions(subscription.clone()); + let exit = Arc::new(AtomicBool::new(false)); + let exit_ = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-pubsub".to_string()) + .spawn(move || { + let mut io = PubSubHandler::default(); + io.extend_with(rpc.to_delegate()); + + let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| { + info!("New pubsub connection"); + let session = Arc::new(Session::new(context.sender().clone())); + session.on_drop(|| { + info!("Pubsub connection dropped"); + }); + session + }) + .start(&pubsub_addr); + + if let Err(e) = server { + warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port()); + return; + } + while !exit_.load(Ordering::Relaxed) { + sleep(Duration::from_millis(100)); + } + server.unwrap().close(); + }) + .unwrap(); + PubSubService { + thread_hdl, + exit, + rpc_bank, + subscription, + } + } + + pub fn set_bank(&self, bank: &Arc) { + self.rpc_bank.write().unwrap().bank = bank.clone(); + bank.set_subscriptions(self.subscription.clone()); + } + + pub fn exit(&self) { + self.exit.store(true, Ordering::Relaxed); + } + + pub fn close(self) -> thread::Result<()> { + self.exit(); + self.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::genesis_block::GenesisBlock; + use std::net::{IpAddr, Ipv4Addr}; + + #[test] + fn test_pubsub_new() { + let (genesis_block, _) = GenesisBlock::new(10_000); + let bank = Bank::new(&genesis_block); + let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr); + let thread = pubsub_service.thread_hdl.thread(); + assert_eq!(thread.name().unwrap(), "solana-pubsub"); + } +}