Adding a correct way to free the RPC context

This commit is contained in:
Godmode Galactus 2022-11-25 10:22:12 +01:00
parent b73f214085
commit 53995c3de6
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 41 additions and 21 deletions

View File

@ -27,7 +27,7 @@ pub fn main() {
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let request_processor = LightRpcRequestProcessor::new(json_rpc_url, websocket_url);
let mut request_processor = LightRpcRequestProcessor::new(json_rpc_url, websocket_url);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
@ -41,18 +41,22 @@ pub fn main() {
let max_request_body_size: usize = 50 * (1 << 10);
let socket_addr = *rpc_addr;
let server =
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| {
request_processor.clone()
})
.event_loop_executor(runtime.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.cors_max_age(86400)
.max_request_body_size(max_request_body_size)
.start_http(&socket_addr);
println!("Starting Lite RPC node");
server.unwrap().wait();
{
let request_processor = request_processor.clone();
let server =
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| {
request_processor.clone()
})
.event_loop_executor(runtime.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.cors_max_age(86400)
.max_request_body_size(max_request_body_size)
.start_http(&socket_addr);
println!("Starting Lite RPC node");
server.unwrap().wait();
}
request_processor.free();
}

View File

@ -3,7 +3,7 @@ use solana_client::{
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use std::thread::{Builder, JoinHandle};
use std::{thread::{Builder, JoinHandle}, sync::Mutex};
use crate::context::{BlockInformation, LiteRpcContext};
use {
@ -22,7 +22,7 @@ use {
signature::Signature,
transaction::VersionedTransaction,
},
solana_tpu_client::connection_cache::ConnectionCache,
solana_client::connection_cache::ConnectionCache,
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
any::type_name,
@ -39,8 +39,8 @@ pub struct LightRpcRequestProcessor {
pub ws_url: String,
pub context: Arc<LiteRpcContext>,
_connection_cache: Arc<ConnectionCache>,
_joinables: Arc<Vec<JoinHandle<()>>>,
_subscribed_clients: Arc<Vec<PubsubBlockClientSubscription>>,
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
}
impl LightRpcRequestProcessor {
@ -88,8 +88,8 @@ impl LightRpcRequestProcessor {
ws_url: websocket_url.to_string(),
context,
_connection_cache: connection_cache,
_joinables: Arc::new(joinables),
_subscribed_clients: Arc::new(vec![client_confirmed, client_finalized]),
joinables: Arc::new(Mutex::new(joinables)),
subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])),
}
}
@ -201,6 +201,22 @@ impl LightRpcRequestProcessor {
}
}
}
pub fn free(&mut self) {
let subscribed_clients = &mut self.subscribed_clients.lock().unwrap();
let len_sc = subscribed_clients.len();
for _i in 0..len_sc {
let mut subscribed_client = subscribed_clients.pop().unwrap();
subscribed_client.send_unsubscribe().unwrap();
subscribed_client.shutdown().unwrap();
}
let joinables = &mut self.joinables.lock().unwrap();
let len = joinables.len();
for _i in 0..len {
joinables.pop().unwrap().join().unwrap();
}
}
}
impl Metadata for LightRpcRequestProcessor {}