2019-02-17 09:09:46 -08:00
|
|
|
//! The `pubsub` module implements a threaded subscription service on client RPC request
|
|
|
|
|
2021-05-18 23:54:28 -07:00
|
|
|
use {
|
|
|
|
crate::{
|
2021-06-16 21:28:23 -07:00
|
|
|
rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl, MAX_ACTIVE_SUBSCRIPTIONS},
|
2021-05-18 23:54:28 -07:00
|
|
|
rpc_subscriptions::RpcSubscriptions,
|
|
|
|
},
|
|
|
|
jsonrpc_pubsub::{PubSubHandler, Session},
|
|
|
|
jsonrpc_ws_server::{RequestContext, ServerBuilder},
|
|
|
|
std::{
|
|
|
|
net::SocketAddr,
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc,
|
|
|
|
},
|
|
|
|
thread::{self, sleep, Builder, JoinHandle},
|
|
|
|
time::Duration,
|
2020-03-30 16:53:25 -07:00
|
|
|
},
|
|
|
|
};
|
2019-02-17 09:09:46 -08:00
|
|
|
|
2020-10-01 12:36:58 -07:00
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct PubSubConfig {
|
2020-11-14 09:29:51 -08:00
|
|
|
pub enable_vote_subscription: bool,
|
|
|
|
|
2020-10-01 12:36:58 -07:00
|
|
|
// See the corresponding fields in
|
|
|
|
// https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
|
|
|
|
// for a complete description of each field in this struct
|
|
|
|
pub max_connections: usize,
|
|
|
|
pub max_fragment_size: usize,
|
|
|
|
pub max_in_buffer_capacity: usize,
|
|
|
|
pub max_out_buffer_capacity: usize,
|
2021-06-16 21:28:23 -07:00
|
|
|
pub max_active_subscriptions: usize,
|
2020-10-01 12:36:58 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for PubSubConfig {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
2020-11-14 09:29:51 -08:00
|
|
|
enable_vote_subscription: false,
|
|
|
|
max_connections: 1000, // Arbitrary, default of 100 is too low
|
|
|
|
max_fragment_size: 50 * 1024, // 50KB
|
2020-10-01 12:36:58 -07:00
|
|
|
max_in_buffer_capacity: 50 * 1024, // 50KB
|
|
|
|
max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
|
2021-06-16 21:28:23 -07:00
|
|
|
max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
|
2020-10-01 12:36:58 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-17 09:09:46 -08:00
|
|
|
pub struct PubSubService {
|
|
|
|
thread_hdl: JoinHandle<()>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PubSubService {
|
2019-03-04 15:44:31 -08:00
|
|
|
pub fn new(
|
2020-10-01 12:36:58 -07:00
|
|
|
pubsub_config: PubSubConfig,
|
2019-03-04 15:44:31 -08:00
|
|
|
subscriptions: &Arc<RpcSubscriptions>,
|
|
|
|
pubsub_addr: SocketAddr,
|
|
|
|
exit: &Arc<AtomicBool>,
|
|
|
|
) -> Self {
|
2019-02-17 09:09:46 -08:00
|
|
|
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
2021-06-16 21:28:23 -07:00
|
|
|
let rpc = RpcSolPubSubImpl::new(
|
|
|
|
subscriptions.clone(),
|
|
|
|
pubsub_config.max_active_subscriptions,
|
|
|
|
);
|
2019-02-17 09:09:46 -08:00
|
|
|
let exit_ = exit.clone();
|
2020-10-01 12:36:58 -07:00
|
|
|
|
2019-02-17 09:09:46 -08:00
|
|
|
let thread_hdl = Builder::new()
|
|
|
|
.name("solana-pubsub".to_string())
|
|
|
|
.spawn(move || {
|
|
|
|
let mut io = PubSubHandler::default();
|
|
|
|
io.extend_with(rpc.to_delegate());
|
|
|
|
|
|
|
|
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
|
2020-10-01 12:36:58 -07:00
|
|
|
info!("New pubsub connection");
|
|
|
|
let session = Arc::new(Session::new(context.sender()));
|
|
|
|
session.on_drop(|| {
|
|
|
|
info!("Pubsub connection dropped");
|
|
|
|
});
|
|
|
|
session
|
2019-02-17 09:09:46 -08:00
|
|
|
})
|
2020-10-01 12:36:58 -07:00
|
|
|
.max_connections(pubsub_config.max_connections)
|
2021-02-08 22:06:01 -08:00
|
|
|
.max_payload(pubsub_config.max_fragment_size)
|
|
|
|
.max_in_buffer_capacity(pubsub_config.max_in_buffer_capacity)
|
|
|
|
.max_out_buffer_capacity(pubsub_config.max_out_buffer_capacity)
|
2019-02-17 09:09:46 -08:00
|
|
|
.start(&pubsub_addr);
|
|
|
|
|
|
|
|
if let Err(e) = server {
|
2020-10-01 12:36:58 -07:00
|
|
|
warn!(
|
|
|
|
"Pubsub service unavailable error: {:?}. \n\
|
|
|
|
Also, check that port {} is not already in use by another application",
|
|
|
|
e,
|
|
|
|
pubsub_addr.port()
|
|
|
|
);
|
2019-02-17 09:09:46 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
while !exit_.load(Ordering::Relaxed) {
|
|
|
|
sleep(Duration::from_millis(100));
|
|
|
|
}
|
|
|
|
server.unwrap().close();
|
|
|
|
})
|
|
|
|
.unwrap();
|
2019-03-04 15:44:31 -08:00
|
|
|
Self { thread_hdl }
|
2019-02-17 09:09:46 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn close(self) -> thread::Result<()> {
|
|
|
|
self.join()
|
|
|
|
}
|
2019-11-13 10:12:09 -08:00
|
|
|
|
|
|
|
pub fn join(self) -> thread::Result<()> {
|
|
|
|
self.thread_hdl.join()
|
|
|
|
}
|
2019-02-17 09:09:46 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
2021-05-18 23:54:28 -07:00
|
|
|
use {
|
|
|
|
super::*,
|
|
|
|
crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
|
|
|
|
solana_runtime::{
|
|
|
|
bank::Bank,
|
|
|
|
bank_forks::BankForks,
|
|
|
|
commitment::BlockCommitmentCache,
|
|
|
|
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
|
|
|
},
|
|
|
|
std::{
|
|
|
|
net::{IpAddr, Ipv4Addr},
|
|
|
|
sync::RwLock,
|
|
|
|
},
|
2020-03-30 16:53:25 -07:00
|
|
|
};
|
2019-02-17 09:09:46 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_pubsub_new() {
|
|
|
|
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
2019-03-04 15:44:31 -08:00
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2020-05-06 23:23:06 -07:00
|
|
|
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
|
|
|
let bank = Bank::new(&genesis_config);
|
2020-06-12 10:04:17 -07:00
|
|
|
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
2020-09-28 19:43:05 -07:00
|
|
|
let optimistically_confirmed_bank =
|
|
|
|
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
2020-03-30 16:53:25 -07:00
|
|
|
let subscriptions = Arc::new(RpcSubscriptions::new(
|
|
|
|
&exit,
|
2020-05-06 23:23:06 -07:00
|
|
|
bank_forks,
|
2020-06-25 21:06:58 -07:00
|
|
|
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
|
2020-09-28 19:43:05 -07:00
|
|
|
optimistically_confirmed_bank,
|
2020-03-30 16:53:25 -07:00
|
|
|
));
|
2020-10-01 12:36:58 -07:00
|
|
|
let pubsub_service =
|
|
|
|
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
|
2019-02-17 09:09:46 -08:00
|
|
|
let thread = pubsub_service.thread_hdl.thread();
|
|
|
|
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
|
|
|
}
|
|
|
|
}
|