2019-02-17 09:09:46 -08:00
|
|
|
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
|
|
|
|
2020-03-30 16:53:25 -07:00
|
|
|
use crate::{
|
|
|
|
rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl},
|
|
|
|
rpc_subscriptions::RpcSubscriptions,
|
|
|
|
};
|
2019-02-17 09:09:46 -08:00
|
|
|
use jsonrpc_pubsub::{PubSubHandler, Session};
|
|
|
|
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
|
2020-03-30 16:53:25 -07:00
|
|
|
use std::{
|
|
|
|
net::SocketAddr,
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc,
|
|
|
|
},
|
|
|
|
thread::{self, sleep, Builder, JoinHandle},
|
|
|
|
time::Duration,
|
|
|
|
};
|
2019-02-17 09:09:46 -08:00
|
|
|
|
2020-10-01 12:36:58 -07:00
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct PubSubConfig {
|
|
|
|
// See the corresponding fields in
|
|
|
|
// https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
|
|
|
|
// for a complete description of each field in this struct
|
|
|
|
pub max_connections: usize,
|
|
|
|
pub max_fragment_size: usize,
|
|
|
|
pub max_in_buffer_capacity: usize,
|
|
|
|
pub max_out_buffer_capacity: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for PubSubConfig {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
max_connections: 1000, // Arbitrary, default of 100 is too low
|
|
|
|
max_fragment_size: 50 * 1024, // 50KB
|
|
|
|
max_in_buffer_capacity: 50 * 1024, // 50KB
|
|
|
|
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-17 09:09:46 -08:00
|
|
|
pub struct PubSubService {
|
|
|
|
thread_hdl: JoinHandle<()>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PubSubService {
|
2019-03-04 15:44:31 -08:00
|
|
|
pub fn new(
|
2020-10-01 12:36:58 -07:00
|
|
|
pubsub_config: PubSubConfig,
|
2019-03-04 15:44:31 -08:00
|
|
|
subscriptions: &Arc<RpcSubscriptions>,
|
|
|
|
pubsub_addr: SocketAddr,
|
|
|
|
exit: &Arc<AtomicBool>,
|
|
|
|
) -> Self {
|
2019-02-17 09:09:46 -08:00
|
|
|
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
2019-02-18 16:25:17 -08:00
|
|
|
let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
|
2019-02-17 09:09:46 -08:00
|
|
|
let exit_ = exit.clone();
|
2020-10-01 12:36:58 -07:00
|
|
|
|
|
|
|
// TODO: Once https://github.com/paritytech/jsonrpc/pull/594 lands, use
|
|
|
|
// `ServerBuilder::max_in_buffer_capacity()` and `Server::max_out_buffer_capacity() methods
|
|
|
|
// instead of only `ServerBuilder::max_payload`
|
|
|
|
let max_payload = *[
|
|
|
|
pubsub_config.max_fragment_size,
|
|
|
|
pubsub_config.max_in_buffer_capacity,
|
|
|
|
pubsub_config.max_out_buffer_capacity,
|
|
|
|
]
|
|
|
|
.iter()
|
|
|
|
.max()
|
|
|
|
.unwrap();
|
|
|
|
info!("rpc_pubsub max_payload: {}", max_payload);
|
|
|
|
|
2019-02-17 09:09:46 -08:00
|
|
|
let thread_hdl = Builder::new()
|
|
|
|
.name("solana-pubsub".to_string())
|
|
|
|
.spawn(move || {
|
|
|
|
let mut io = PubSubHandler::default();
|
|
|
|
io.extend_with(rpc.to_delegate());
|
|
|
|
|
|
|
|
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
|
2020-10-01 12:36:58 -07:00
|
|
|
info!("New pubsub connection");
|
|
|
|
let session = Arc::new(Session::new(context.sender()));
|
|
|
|
session.on_drop(|| {
|
|
|
|
info!("Pubsub connection dropped");
|
|
|
|
});
|
|
|
|
session
|
2019-02-17 09:09:46 -08:00
|
|
|
})
|
2020-10-01 12:36:58 -07:00
|
|
|
.max_connections(pubsub_config.max_connections)
|
|
|
|
.max_payload(max_payload)
|
2019-02-17 09:09:46 -08:00
|
|
|
.start(&pubsub_addr);
|
|
|
|
|
|
|
|
if let Err(e) = server {
|
2020-10-01 12:36:58 -07:00
|
|
|
warn!(
|
|
|
|
"Pubsub service unavailable error: {:?}. \n\
|
|
|
|
Also, check that port {} is not already in use by another application",
|
|
|
|
e,
|
|
|
|
pubsub_addr.port()
|
|
|
|
);
|
2019-02-17 09:09:46 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
while !exit_.load(Ordering::Relaxed) {
|
|
|
|
sleep(Duration::from_millis(100));
|
|
|
|
}
|
|
|
|
server.unwrap().close();
|
|
|
|
})
|
|
|
|
.unwrap();
|
2019-03-04 15:44:31 -08:00
|
|
|
Self { thread_hdl }
|
2019-02-17 09:09:46 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn close(self) -> thread::Result<()> {
|
|
|
|
self.join()
|
|
|
|
}
|
2019-11-13 10:12:09 -08:00
|
|
|
|
|
|
|
pub fn join(self) -> thread::Result<()> {
|
|
|
|
self.thread_hdl.join()
|
|
|
|
}
|
2019-02-17 09:09:46 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2020-09-28 19:43:05 -07:00
|
|
|
use crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank;
|
2020-06-25 21:06:58 -07:00
|
|
|
use solana_runtime::{
|
|
|
|
bank::Bank,
|
|
|
|
bank_forks::BankForks,
|
|
|
|
commitment::BlockCommitmentCache,
|
2020-05-06 23:23:06 -07:00
|
|
|
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
|
|
|
};
|
2020-03-30 16:53:25 -07:00
|
|
|
use std::{
|
|
|
|
net::{IpAddr, Ipv4Addr},
|
|
|
|
sync::RwLock,
|
|
|
|
};
|
2019-02-17 09:09:46 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_pubsub_new() {
|
|
|
|
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
2019-03-04 15:44:31 -08:00
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2020-05-06 23:23:06 -07:00
|
|
|
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
|
|
|
let bank = Bank::new(&genesis_config);
|
2020-06-12 10:04:17 -07:00
|
|
|
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
2020-09-28 19:43:05 -07:00
|
|
|
let optimistically_confirmed_bank =
|
|
|
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
2020-03-30 16:53:25 -07:00
|
|
|
let subscriptions = Arc::new(RpcSubscriptions::new(
|
|
|
|
&exit,
|
2020-05-06 23:23:06 -07:00
|
|
|
bank_forks,
|
2020-06-25 21:06:58 -07:00
|
|
|
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
|
2020-09-28 19:43:05 -07:00
|
|
|
optimistically_confirmed_bank,
|
2020-03-30 16:53:25 -07:00
|
|
|
));
|
2020-10-01 12:36:58 -07:00
|
|
|
let pubsub_service =
|
|
|
|
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
|
2019-02-17 09:09:46 -08:00
|
|
|
let thread = pubsub_service.thread_hdl.thread();
|
|
|
|
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
|
|
|
}
|
|
|
|
}
|