2019-02-17 09:09:46 -08:00
|
|
|
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
|
|
|
|
2019-02-17 12:51:12 -08:00
|
|
|
use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl};
|
2019-02-17 09:09:46 -08:00
|
|
|
use crate::rpc_subscriptions::RpcSubscriptions;
|
|
|
|
use jsonrpc_pubsub::{PubSubHandler, Session};
|
|
|
|
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
|
|
|
|
use std::net::SocketAddr;
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2019-02-17 12:51:12 -08:00
|
|
|
use std::sync::Arc;
|
2019-02-17 09:09:46 -08:00
|
|
|
use std::thread::{self, sleep, Builder, JoinHandle};
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
pub struct PubSubService {
|
|
|
|
thread_hdl: JoinHandle<()>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PubSubService {
|
2019-03-04 15:44:31 -08:00
|
|
|
pub fn new(
|
|
|
|
subscriptions: &Arc<RpcSubscriptions>,
|
|
|
|
pubsub_addr: SocketAddr,
|
|
|
|
exit: &Arc<AtomicBool>,
|
|
|
|
) -> Self {
|
2019-02-17 09:09:46 -08:00
|
|
|
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
2019-02-18 16:25:17 -08:00
|
|
|
let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
|
2019-02-17 09:09:46 -08:00
|
|
|
let exit_ = exit.clone();
|
|
|
|
let thread_hdl = Builder::new()
|
|
|
|
.name("solana-pubsub".to_string())
|
|
|
|
.spawn(move || {
|
|
|
|
let mut io = PubSubHandler::default();
|
|
|
|
io.extend_with(rpc.to_delegate());
|
|
|
|
|
|
|
|
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
|
|
|
|
info!("New pubsub connection");
|
2019-12-19 23:27:54 -08:00
|
|
|
let session = Arc::new(Session::new(context.sender()));
|
2019-02-17 09:09:46 -08:00
|
|
|
session.on_drop(|| {
|
|
|
|
info!("Pubsub connection dropped");
|
|
|
|
});
|
|
|
|
session
|
|
|
|
})
|
|
|
|
.start(&pubsub_addr);
|
|
|
|
|
|
|
|
if let Err(e) = server {
|
|
|
|
warn!("Pubsub service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, pubsub_addr.port());
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
while !exit_.load(Ordering::Relaxed) {
|
|
|
|
sleep(Duration::from_millis(100));
|
|
|
|
}
|
|
|
|
server.unwrap().close();
|
|
|
|
})
|
|
|
|
.unwrap();
|
2019-03-04 15:44:31 -08:00
|
|
|
Self { thread_hdl }
|
2019-02-17 09:09:46 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn close(self) -> thread::Result<()> {
|
|
|
|
self.join()
|
|
|
|
}
|
2019-11-13 10:12:09 -08:00
|
|
|
|
|
|
|
pub fn join(self) -> thread::Result<()> {
|
|
|
|
self.thread_hdl.join()
|
|
|
|
}
|
2019-02-17 09:09:46 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
|
|
|
use std::net::{IpAddr, Ipv4Addr};
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_pubsub_new() {
|
2019-02-18 16:25:17 -08:00
|
|
|
let subscriptions = Arc::new(RpcSubscriptions::default());
|
2019-02-17 09:09:46 -08:00
|
|
|
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
2019-03-04 15:44:31 -08:00
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit);
|
2019-02-17 09:09:46 -08:00
|
|
|
let thread = pubsub_service.thread_hdl.thread();
|
|
|
|
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
|
|
|
}
|
|
|
|
}
|