RPCTester should test rpc and not self
This commit is contained in:
parent
aa6f4371d6
commit
d16bd5bd0d
|
@ -12,7 +12,7 @@ pub async fn poll_commitment_slots(
|
||||||
commitment_config: CommitmentConfig,
|
commitment_config: CommitmentConfig,
|
||||||
slot_tx: tokio::sync::mpsc::UnboundedSender<Slot>,
|
slot_tx: tokio::sync::mpsc::UnboundedSender<Slot>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut poll_frequency = tokio::time::interval(Duration::from_millis(10));
|
let mut poll_frequency = tokio::time::interval(Duration::from_millis(50));
|
||||||
let mut last_slot = 0;
|
let mut last_slot = 0;
|
||||||
let mut errors = 0;
|
let mut errors = 0;
|
||||||
loop {
|
loop {
|
||||||
|
|
|
@ -74,9 +74,8 @@ pub async fn start_postgres(
|
||||||
Ok((Some(postgres_send), postgres))
|
Ok((Some(postgres_send), postgres))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
|
pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
|
||||||
let Args {
|
let Args {
|
||||||
rpc_addr,
|
|
||||||
lite_rpc_ws_addr,
|
lite_rpc_ws_addr,
|
||||||
lite_rpc_http_addr,
|
lite_rpc_http_addr,
|
||||||
fanout_size,
|
fanout_size,
|
||||||
|
@ -101,8 +100,6 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
|
||||||
|
|
||||||
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
|
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
|
||||||
|
|
||||||
// rpc client
|
|
||||||
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
|
|
||||||
let (subscriptions, cluster_endpoint_tasks) = if use_grpc {
|
let (subscriptions, cluster_endpoint_tasks) = if use_grpc {
|
||||||
create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())?
|
create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())?
|
||||||
} else {
|
} else {
|
||||||
|
@ -239,9 +236,12 @@ pub async fn main() -> anyhow::Result<()> {
|
||||||
let args = get_args();
|
let args = get_args();
|
||||||
|
|
||||||
let ctrl_c_signal = tokio::signal::ctrl_c();
|
let ctrl_c_signal = tokio::signal::ctrl_c();
|
||||||
let rpc_tester = RpcTester::from(&args).start();
|
let Args { rpc_addr, .. } = &args;
|
||||||
|
// rpc client
|
||||||
|
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
|
||||||
|
let rpc_tester = tokio::spawn(RpcTester::new(rpc_client.clone()).start());
|
||||||
|
|
||||||
let main = start_lite_rpc(args.clone());
|
let main = start_lite_rpc(args.clone(), rpc_client);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
err = rpc_tester => {
|
err = rpc_tester => {
|
||||||
|
|
|
@ -1,25 +1,20 @@
|
||||||
use std::net::SocketAddr;
|
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use lite_rpc::cli::Args;
|
|
||||||
use prometheus::{opts, register_gauge, Gauge};
|
use prometheus::{opts, register_gauge, Gauge};
|
||||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
static ref RPC_RESPONDING: Gauge =
|
static ref RPC_RESPONDING: Gauge =
|
||||||
register_gauge!(opts!("literpc_rpc_responding", "If LiteRpc is responding")).unwrap();
|
register_gauge!(opts!("literpc_rpc_responding", "If LiteRpc is responding")).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RpcTester(RpcClient);
|
pub struct RpcTester {
|
||||||
|
rpc_client: Arc<RpcClient>,
|
||||||
|
}
|
||||||
|
|
||||||
impl From<&Args> for RpcTester {
|
impl RpcTester {
|
||||||
fn from(value: &Args) -> Self {
|
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
|
||||||
let addr: SocketAddr = value
|
Self { rpc_client }
|
||||||
.lite_rpc_http_addr
|
|
||||||
.parse()
|
|
||||||
.expect("Invalid literpc http address");
|
|
||||||
|
|
||||||
RpcTester(RpcClient::new(format!("http://0.0.0.0:{}", addr.port())))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,11 +22,12 @@ impl RpcTester {
|
||||||
/// Starts a loop that checks if the rpc is responding every 5 seconds
|
/// Starts a loop that checks if the rpc is responding every 5 seconds
|
||||||
pub async fn start(self) -> anyhow::Result<()> {
|
pub async fn start(self) -> anyhow::Result<()> {
|
||||||
let mut error_counter = 0;
|
let mut error_counter = 0;
|
||||||
|
let rpc_client = self.rpc_client;
|
||||||
loop {
|
loop {
|
||||||
// sleep for 5 seconds
|
// sleep for 5 seconds
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||||
// do a simple request to self for getVersion
|
// do a simple request to self for getVersion
|
||||||
let Err(err) = self.0.get_version().await else {
|
let Err(err) = rpc_client.get_version().await else {
|
||||||
RPC_RESPONDING.set(1.0);
|
RPC_RESPONDING.set(1.0);
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue