move rpc_server to drop() semantics instead of having its own thread
This commit is contained in:
parent
bace2880d0
commit
40aa0654fa
|
@ -24,6 +24,7 @@ use window;
|
||||||
pub struct Fullnode {
|
pub struct Fullnode {
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
thread_hdls: Vec<JoinHandle<()>>,
|
thread_hdls: Vec<JoinHandle<()>>,
|
||||||
|
_rpc_service: JsonRpcService,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
|
@ -190,14 +191,8 @@ impl Fullnode {
|
||||||
let mut drone_addr = node.info.contact_info.tpu;
|
let mut drone_addr = node.info.contact_info.tpu;
|
||||||
drone_addr.set_port(DRONE_PORT);
|
drone_addr.set_port(DRONE_PORT);
|
||||||
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT);
|
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT);
|
||||||
let rpc_service = JsonRpcService::new(
|
let _rpc_service =
|
||||||
&bank,
|
JsonRpcService::new(&bank, node.info.contact_info.tpu, drone_addr, rpc_addr);
|
||||||
node.info.contact_info.tpu,
|
|
||||||
drone_addr,
|
|
||||||
rpc_addr,
|
|
||||||
exit.clone(),
|
|
||||||
);
|
|
||||||
thread_hdls.extend(rpc_service.thread_hdls());
|
|
||||||
|
|
||||||
let blob_recycler = BlobRecycler::default();
|
let blob_recycler = BlobRecycler::default();
|
||||||
let window =
|
let window =
|
||||||
|
@ -266,7 +261,11 @@ impl Fullnode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Fullnode { exit, thread_hdls }
|
Fullnode {
|
||||||
|
exit,
|
||||||
|
thread_hdls,
|
||||||
|
_rpc_service,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//used for notifying many nodes in parallel to exit
|
//used for notifying many nodes in parallel to exit
|
||||||
|
|
53
src/rpc.rs
53
src/rpc.rs
|
@ -5,13 +5,11 @@ use bincode::deserialize;
|
||||||
use bs58;
|
use bs58;
|
||||||
use jsonrpc_core::*;
|
use jsonrpc_core::*;
|
||||||
use jsonrpc_http_server::*;
|
use jsonrpc_http_server::*;
|
||||||
use service::Service;
|
|
||||||
use signature::{Pubkey, Signature};
|
use signature::{Pubkey, Signature};
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread::{self, sleep, Builder, JoinHandle};
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
@ -20,7 +18,7 @@ use wallet::request_airdrop;
|
||||||
pub const RPC_PORT: u16 = 8899;
|
pub const RPC_PORT: u16 = 8899;
|
||||||
|
|
||||||
pub struct JsonRpcService {
|
pub struct JsonRpcService {
|
||||||
thread_hdl: JoinHandle<()>,
|
server: Option<Server>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JsonRpcService {
|
impl JsonRpcService {
|
||||||
|
@ -29,18 +27,13 @@ impl JsonRpcService {
|
||||||
transactions_addr: SocketAddr,
|
transactions_addr: SocketAddr,
|
||||||
drone_addr: SocketAddr,
|
drone_addr: SocketAddr,
|
||||||
rpc_addr: SocketAddr,
|
rpc_addr: SocketAddr,
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let request_processor = JsonRpcRequestProcessor::new(bank.clone());
|
let request_processor = JsonRpcRequestProcessor::new(bank.clone());
|
||||||
let thread_hdl = Builder::new()
|
|
||||||
.name("solana-jsonrpc".to_string())
|
|
||||||
.spawn(move || {
|
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
let rpc = RpcSolImpl;
|
let rpc = RpcSolImpl;
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
|
|
||||||
let server =
|
let server = ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta {
|
||||||
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta {
|
|
||||||
request_processor: request_processor.clone(),
|
request_processor: request_processor.clone(),
|
||||||
transactions_addr,
|
transactions_addr,
|
||||||
drone_addr,
|
drone_addr,
|
||||||
|
@ -49,31 +42,27 @@ impl JsonRpcService {
|
||||||
AccessControlAllowOrigin::Any,
|
AccessControlAllowOrigin::Any,
|
||||||
]))
|
]))
|
||||||
.start_http(&rpc_addr);
|
.start_http(&rpc_addr);
|
||||||
if server.is_err() {
|
|
||||||
warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
loop {
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
server.unwrap().close();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
sleep(Duration::from_millis(100));
|
|
||||||
}
|
|
||||||
()
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
JsonRpcService { thread_hdl }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Service for JsonRpcService {
|
let server = match server {
|
||||||
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
|
Ok(server) => Some(server),
|
||||||
vec![self.thread_hdl]
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"JSON RPC service unavailable: error {}. \n\
|
||||||
|
Make sure the RPC port {} is not already in use by another application",
|
||||||
|
e,
|
||||||
|
rpc_addr.port()
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
JsonRpcService { server }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn join(self) -> thread::Result<()> {
|
pub fn close(self) {
|
||||||
self.thread_hdl.join()
|
if let Some(server) = self.server {
|
||||||
|
server.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue