Move RpcPubSubService into its own module

This commit is contained in:
Greg Fitzgerald 2019-02-17 10:09:46 -07:00
parent 50d3fa7437
commit eb483bc053
4 changed files with 117 additions and 97 deletions

View File

@ -11,7 +11,7 @@ use crate::gossip_service::GossipService;
use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig};
use crate::poh_service::PohServiceConfig;
use crate::rpc::JsonRpcService;
use crate::rpc_pubsub::PubSubService;
use crate::rpc_pubsub_service::PubSubService;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender};

View File

@ -61,6 +61,7 @@ pub mod retransmit_stage;
pub mod rpc;
pub mod rpc_mock;
pub mod rpc_pubsub;
pub mod rpc_pubsub_service;
pub mod rpc_request;
pub mod rpc_subscriptions;
pub mod service;

View File

@ -3,97 +3,19 @@
use crate::bank::Bank;
use crate::rpc::RpcSignatureStatus;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use bs58;
use jsonrpc_core::futures::Future;
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::Subscriber;
use jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId};
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use std::mem;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{atomic, Arc, RwLock};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
pub struct PubSubService {
thread_hdl: JoinHandle<()>,
exit: Arc<AtomicBool>,
rpc_bank: Arc<RwLock<RpcPubSubBank>>,
subscription: Arc<RpcSubscriptions>,
}
impl Service for PubSubService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
impl PubSubService {
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
bank.set_subscriptions(subscription.clone());
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let thread_hdl = Builder::new()
.name("solana-pubsub".to_string())
.spawn(move || {
let mut io = PubSubHandler::default();
io.extend_with(rpc.to_delegate());
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
info!("New pubsub connection");
let session = Arc::new(Session::new(context.sender().clone()));
session.on_drop(|| {
info!("Pubsub connection dropped");
});
session
})
.start(&pubsub_addr);
if let Err(e) = server {
warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port());
return;
}
while !exit_.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
server.unwrap().close();
})
.unwrap();
PubSubService {
thread_hdl,
exit,
rpc_bank,
subscription,
}
}
pub fn set_bank(&self, bank: &Arc<Bank>) {
self.rpc_bank.write().unwrap().bank = bank.clone();
bank.set_subscriptions(self.subscription.clone());
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
#[rpc]
pub trait RpcSolPubSub {
type Metadata;
@ -133,8 +55,8 @@ pub trait RpcSolPubSub {
fn signature_unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
}
struct RpcPubSubBank {
bank: Arc<Bank>,
pub struct RpcPubSubBank {
pub bank: Arc<Bank>,
}
impl RpcPubSubBank {
@ -143,14 +65,14 @@ impl RpcPubSubBank {
}
}
struct RpcSolPubSubImpl {
pub struct RpcSolPubSubImpl {
uid: Arc<atomic::AtomicUsize>,
bank: Arc<RwLock<RpcPubSubBank>>,
subscription: Arc<RpcSubscriptions>,
pub subscription: Arc<RpcSubscriptions>,
}
impl RpcSolPubSubImpl {
fn new(bank: Arc<RwLock<RpcPubSubBank>>) -> Self {
pub fn new(bank: Arc<RwLock<RpcPubSubBank>>) -> Self {
RpcSolPubSubImpl {
uid: Arc::new(atomic::AtomicUsize::default()),
bank,
@ -158,7 +80,11 @@ impl RpcSolPubSubImpl {
}
}
fn subscribe_to_account_updates(&self, subscriber: Subscriber<Account>, pubkey_str: String) {
pub fn subscribe_to_account_updates(
&self,
subscriber: Subscriber<Account>,
pubkey_str: String,
) {
let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap();
if pubkey_vec.len() != mem::size_of::<Pubkey>() {
subscriber
@ -181,7 +107,7 @@ impl RpcSolPubSubImpl {
.add_account_subscription(&pubkey, &sub_id, &sink)
}
fn subscribe_to_signature_updates(
pub fn subscribe_to_signature_updates(
&self,
subscriber: Subscriber<RpcSignatureStatus>,
signature_str: String,
@ -294,19 +220,9 @@ mod tests {
use solana_sdk::budget_transaction::BudgetTransaction;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use std::net::{IpAddr, Ipv4Addr};
use std::thread::sleep;
use tokio::prelude::{Async, Stream};
#[test]
fn test_pubsub_new() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr);
let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub");
}
#[test]
fn test_signature_subscribe() {
let (genesis_block, alice) = GenesisBlock::new(10_000);

103
src/rpc_pubsub_service.rs Normal file
View File

@ -0,0 +1,103 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::bank::Bank;
use crate::rpc_pubsub::{RpcPubSubBank, RpcSolPubSub, RpcSolPubSubImpl};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use jsonrpc_pubsub::{PubSubHandler, Session};
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
pub struct PubSubService {
thread_hdl: JoinHandle<()>,
exit: Arc<AtomicBool>,
rpc_bank: Arc<RwLock<RpcPubSubBank>>,
subscription: Arc<RpcSubscriptions>,
}
impl Service for PubSubService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
impl PubSubService {
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone();
bank.set_subscriptions(subscription.clone());
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let thread_hdl = Builder::new()
.name("solana-pubsub".to_string())
.spawn(move || {
let mut io = PubSubHandler::default();
io.extend_with(rpc.to_delegate());
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
info!("New pubsub connection");
let session = Arc::new(Session::new(context.sender().clone()));
session.on_drop(|| {
info!("Pubsub connection dropped");
});
session
})
.start(&pubsub_addr);
if let Err(e) = server {
warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port());
return;
}
while !exit_.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
server.unwrap().close();
})
.unwrap();
PubSubService {
thread_hdl,
exit,
rpc_bank,
subscription,
}
}
pub fn set_bank(&self, bank: &Arc<Bank>) {
self.rpc_bank.write().unwrap().bank = bank.clone();
bank.set_subscriptions(self.subscription.clone());
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::genesis_block::GenesisBlock;
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn test_pubsub_new() {
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr);
let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub");
}
}