From d16bd5bd0d784ff4194bea19bca0c2c3365947b3 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Thu, 7 Sep 2023 16:46:14 +0200 Subject: [PATCH] RPCTester should test rpc and not self --- .../src/rpc_polling/poll_slots.rs | 2 +- lite-rpc/src/main.rs | 12 +++++----- lite-rpc/src/rpc_tester.rs | 22 ++++++++----------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/cluster-endpoints/src/rpc_polling/poll_slots.rs b/cluster-endpoints/src/rpc_polling/poll_slots.rs index 16e625df..b2a922ce 100644 --- a/cluster-endpoints/src/rpc_polling/poll_slots.rs +++ b/cluster-endpoints/src/rpc_polling/poll_slots.rs @@ -12,7 +12,7 @@ pub async fn poll_commitment_slots( commitment_config: CommitmentConfig, slot_tx: tokio::sync::mpsc::UnboundedSender, ) -> anyhow::Result<()> { - let mut poll_frequency = tokio::time::interval(Duration::from_millis(10)); + let mut poll_frequency = tokio::time::interval(Duration::from_millis(50)); let mut last_slot = 0; let mut errors = 0; loop { diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 365911cb..7b3a253e 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -74,9 +74,8 @@ pub async fn start_postgres( Ok((Some(postgres_send), postgres)) } -pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> { +pub async fn start_lite_rpc(args: Args, rpc_client: Arc) -> anyhow::Result<()> { let Args { - rpc_addr, lite_rpc_ws_addr, lite_rpc_http_addr, fanout_size, @@ -101,8 +100,6 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> { let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr); - // rpc client - let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone())); let (subscriptions, cluster_endpoint_tasks) = if use_grpc { create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())? } else { @@ -239,9 +236,12 @@ pub async fn main() -> anyhow::Result<()> { let args = get_args(); let ctrl_c_signal = tokio::signal::ctrl_c(); - let rpc_tester = RpcTester::from(&args).start(); + let Args { rpc_addr, .. } = &args; + // rpc client + let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone())); + let rpc_tester = tokio::spawn(RpcTester::new(rpc_client.clone()).start()); - let main = start_lite_rpc(args.clone()); + let main = start_lite_rpc(args.clone(), rpc_client); tokio::select! { err = rpc_tester => { diff --git a/lite-rpc/src/rpc_tester.rs b/lite-rpc/src/rpc_tester.rs index a97c0b66..4823a01a 100644 --- a/lite-rpc/src/rpc_tester.rs +++ b/lite-rpc/src/rpc_tester.rs @@ -1,25 +1,20 @@ -use std::net::SocketAddr; - use anyhow::bail; -use lite_rpc::cli::Args; use prometheus::{opts, register_gauge, Gauge}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use std::sync::Arc; lazy_static::lazy_static! { static ref RPC_RESPONDING: Gauge = register_gauge!(opts!("literpc_rpc_responding", "If LiteRpc is responding")).unwrap(); } -pub struct RpcTester(RpcClient); +pub struct RpcTester { + rpc_client: Arc, +} -impl From<&Args> for RpcTester { - fn from(value: &Args) -> Self { - let addr: SocketAddr = value - .lite_rpc_http_addr - .parse() - .expect("Invalid literpc http address"); - - RpcTester(RpcClient::new(format!("http://0.0.0.0:{}", addr.port()))) +impl RpcTester { + pub fn new(rpc_client: Arc) -> Self { + Self { rpc_client } } } @@ -27,11 +22,12 @@ impl RpcTester { /// Starts a loop that checks if the rpc is responding every 5 seconds pub async fn start(self) -> anyhow::Result<()> { let mut error_counter = 0; + let rpc_client = self.rpc_client; loop { // sleep for 5 seconds tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; // do a simple request to self for getVersion - let Err(err) = self.0.get_version().await else { + let Err(err) = rpc_client.get_version().await else { RPC_RESPONDING.set(1.0); continue; };