From 40aa0654fa012c835c2de2e33c29b288b4f89aea Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Mon, 10 Sep 2018 17:35:07 -0700 Subject: [PATCH] move rpc_server to drop() semantics instead of having its own thread --- src/fullnode.rs | 17 ++++++----- src/rpc.rs | 75 +++++++++++++++++++++---------------------------- 2 files changed, 40 insertions(+), 52 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index eaeb7f3523..7c03985e79 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -24,6 +24,7 @@ use window; pub struct Fullnode { exit: Arc, thread_hdls: Vec>, + _rpc_service: JsonRpcService, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -190,14 +191,8 @@ impl Fullnode { let mut drone_addr = node.info.contact_info.tpu; drone_addr.set_port(DRONE_PORT); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT); - let rpc_service = JsonRpcService::new( - &bank, - node.info.contact_info.tpu, - drone_addr, - rpc_addr, - exit.clone(), - ); - thread_hdls.extend(rpc_service.thread_hdls()); + let _rpc_service = + JsonRpcService::new(&bank, node.info.contact_info.tpu, drone_addr, rpc_addr); let blob_recycler = BlobRecycler::default(); let window = @@ -266,7 +261,11 @@ impl Fullnode { } } - Fullnode { exit, thread_hdls } + Fullnode { + exit, + thread_hdls, + _rpc_service, + } } //used for notifying many nodes in parallel to exit diff --git a/src/rpc.rs b/src/rpc.rs index ee9a368f46..5ecf2ec869 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -5,13 +5,11 @@ use bincode::deserialize; use bs58; use jsonrpc_core::*; use jsonrpc_http_server::*; -use service::Service; use signature::{Pubkey, Signature}; use std::mem; use std::net::{SocketAddr, UdpSocket}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread::{self, sleep, Builder, JoinHandle}; +use std::thread::sleep; use std::time::Duration; use std::time::Instant; use transaction::Transaction; @@ -20,7 +18,7 @@ use wallet::request_airdrop; pub const RPC_PORT: u16 = 8899; pub struct JsonRpcService { - thread_hdl: JoinHandle<()>, + server: Option, } impl JsonRpcService { @@ -29,51 +27,42 @@ impl JsonRpcService { transactions_addr: SocketAddr, drone_addr: SocketAddr, rpc_addr: SocketAddr, - exit: Arc, ) -> Self { let request_processor = JsonRpcRequestProcessor::new(bank.clone()); - let thread_hdl = Builder::new() - .name("solana-jsonrpc".to_string()) - .spawn(move || { - let mut io = MetaIoHandler::default(); - let rpc = RpcSolImpl; - io.extend_with(rpc.to_delegate()); + let mut io = MetaIoHandler::default(); + let rpc = RpcSolImpl; + io.extend_with(rpc.to_delegate()); - let server = - ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { - request_processor: request_processor.clone(), - transactions_addr, - drone_addr, - }).threads(4) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Any, - ])) - .start_http(&rpc_addr); - if server.is_err() { - warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port()); - return; - } - loop { - if exit.load(Ordering::Relaxed) { - server.unwrap().close(); - break; - } - sleep(Duration::from_millis(100)); - } - () - }) - .unwrap(); - JsonRpcService { thread_hdl } - } -} + let server = ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { + request_processor: request_processor.clone(), + transactions_addr, + drone_addr, + }).threads(4) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Any, + ])) + .start_http(&rpc_addr); -impl Service for JsonRpcService { - fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] + let server = match server { + Ok(server) => Some(server), + Err(e) => { + warn!( + "JSON RPC service unavailable: error {}. \n\ + Make sure the RPC port {} is not already in use by another application", + e, + rpc_addr.port() + ); + None + } + }; + + JsonRpcService { server } } - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() + pub fn close(self) { + if let Some(server) = self.server { + server.close(); + } } }