rpc tester and other fixes

This commit is contained in:
aniketfuryrocks 2023-06-22 17:02:25 +05:30
parent a2057973b8
commit cbb1b8447f
No known key found for this signature in database
GPG Key ID: 1B75EA596D89FF06
6 changed files with 79 additions and 28 deletions

View File

@ -1,7 +1,7 @@
use crate::structures::identity_stakes::IdentityStakes;
use anyhow::Context;
use anyhow::{bail, Context};
use futures::StreamExt;
use log::{info, warn};
use log::info;
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
@ -74,15 +74,15 @@ impl SolanaUtils {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await
.context("error getting slot")?;
.context("Error getting slot")?;
update_slot(slot);
let (mut client, unsub) =
tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe())
.await
.context("timedout subscribing to slots")?
.context("slot pub sub disconnected")?;
.context("Timeout subscribing to slots")?
.context("Slot pub sub disconnected")?;
while let Ok(slot_info) =
tokio::time::timeout(Duration::from_millis(2000), client.next()).await
@ -92,10 +92,9 @@ impl SolanaUtils {
}
}
warn!("slot pub sub disconnected reconnecting");
unsub();
Ok(())
bail!("Slot pub sub disconnected")
}
// Estimates the slots, either from polled slot or by forcefully updating after every 400ms

View File

@ -170,8 +170,7 @@ impl LiteBridge {
self.block_store.clone(),
self.max_retries,
clean_interval,
)
.await;
);
self.transaction_service = Some(transaction_service);
@ -217,27 +216,27 @@ impl LiteBridge {
tokio::select! {
res = ws_server => {
WS_SERVER_FAIL.inc();
bail!("WebSocket server exited unexpectedly {res:?}");
bail!("WebSocket server {res:?}");
},
res = http_server => {
HTTP_SERVER_FAIL.inc();
bail!("HTTP server exited unexpectedly {res:?}");
bail!("HTTP server {res:?}");
},
res = metrics_capture => {
METRICS_SERVICE_FAIL.inc();
bail!("Metrics Capture exited unexpectedly {res:?}");
bail!("Metrics Capture {res:?}");
},
res = prometheus_sync => {
PROMETHEUS_SERVER_FAIL.inc();
bail!("Prometheus Service exited unexpectedly {res:?}");
bail!("Prometheus Service {res:?}");
},
res = postgres => {
POSTGRES_SERVICE_FAIL.inc();
bail!("Postgres service exited unexpectedly {res:?}");
bail!("Postgres service {res:?}");
},
res = jh_transaction_services => {
TX_SERVICE_FAIL.inc();
bail!("Transaction service exited unexpectedly {res:?}");
bail!("Transaction service {res:?}");
}
}
}

View File

@ -1,5 +1,8 @@
pub mod rpc_tester;
use std::time::Duration;
use anyhow::Context;
use clap::Parser;
use dotenv::dotenv;
use lite_rpc::{bridge::LiteBridge, cli::Args};
@ -7,6 +10,8 @@ use prometheus::{opts, register_int_counter, IntCounter};
use solana_sdk::signature::Keypair;
use std::env;
use crate::rpc_tester::RpcTester;
const RESTART_DURATION: Duration = Duration::from_secs(20);
async fn get_identity_keypair(identity_from_cli: &str) -> Keypair {
@ -65,7 +70,8 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
retry_after,
maximum_retries_per_tx,
)
.await?
.await
.context("Error building LiteBridge")?
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
@ -97,6 +103,8 @@ pub async fn main() -> anyhow::Result<()> {
let args = get_args();
let rpc_tester = RpcTester::from(&args).start();
let main = tokio::spawn(async move {
loop {
let Err(err) = start_lite_rpc(args.clone()).await else {
@ -116,6 +124,10 @@ pub async fn main() -> anyhow::Result<()> {
let ctrl_c_signal = tokio::signal::ctrl_c();
tokio::select! {
err = rpc_tester => {
// This should never happen
unreachable!("{err:?}")
}
err = main => {
// This should never happen
unreachable!("{err:?}")

View File

@ -0,0 +1,41 @@
use std::net::SocketAddr;
use lite_rpc::cli::Args;
use prometheus::{opts, register_gauge, Gauge};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
lazy_static::lazy_static! {
static ref RPC_RESPONDING: Gauge =
register_gauge!(opts!("literpc_rpc_responding", "If LiteRpc is responding")).unwrap();
}
pub struct RpcTester(RpcClient);
impl From<&Args> for RpcTester {
fn from(value: &Args) -> Self {
let addr: SocketAddr = value
.lite_rpc_http_addr
.parse()
.expect("Invalid literpc http address");
RpcTester(RpcClient::new(format!("http://0.0.0.0:{}", addr.port())))
}
}
impl RpcTester {
/// Starts a loop that checks if the rpc is responding every 5 seconds
pub async fn start(self) -> ! {
loop {
// sleep for 5 seconds
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// do a simple request to self for getVersion
let Err(err) = self.0.get_version().await else {
RPC_RESPONDING.set(1.0);
continue;
};
RPC_RESPONDING.set(0.0);
log::error!("Rpc not responding {err:?}");
}
}
}

View File

@ -163,7 +163,7 @@ impl TpuService {
.await;
}
async fn update_current_slot(
fn update_current_slot(
&self,
update_notifier: tokio::sync::mpsc::UnboundedSender<u64>,
) -> AnyhowJoinHandle {
@ -256,13 +256,13 @@ impl TpuService {
tokio::select! {
res = update_leader_schedule_service => {
bail!("Leader update service exited unexpectedly {res:?}");
bail!("Leader update Service {res:?}");
},
res = slot_poll_service => {
bail!("Leader update service exited unexpectedly {res:?}");
bail!("Slot Poll Service {res:?}");
},
res = estimated_slot_service => {
bail!("Estimated slot calculation service exited unexpectedly {res:?}");
bail!("Estimated slot Service {res:?}");
},
}
}

View File

@ -49,7 +49,7 @@ impl TransactionServiceBuilder {
}
}
pub async fn start(
pub fn start(
self,
notifier: Option<NotificationSender>,
block_store: BlockStore,
@ -94,25 +94,25 @@ impl TransactionServiceBuilder {
tokio::select! {
res = tpu_service_fx => {
bail!("{res:?}")
bail!("Tpu Service {res:?}")
},
res = tx_sender_jh => {
bail!("{res:?}")
bail!("Tx Sender {res:?}")
},
res = finalized_block_listener => {
bail!("{res:?}")
bail!("Finalized Block Listener {res:?}")
},
res = confirmed_block_listener => {
bail!("{res:?}")
bail!("Confirmed Block Listener {res:?}")
},
res = processed_block_listener => {
bail!("{res:?}")
bail!("Processed Block Listener {res:?}")
},
res = replay_service => {
bail!("{res:?}")
bail!("Replay Service {res:?}")
},
res = cleaner => {
bail!("{res:?}")
bail!("Cleaner {res:?}")
},
}
})