This commit is contained in:
aniketfuryrocks 2023-06-17 05:28:13 +05:30
parent a825e3ec28
commit ced6d4ef4b
No known key found for this signature in database
GPG Key ID: 1B75EA596D89FF06
2 changed files with 57 additions and 27 deletions

View File

@ -57,6 +57,18 @@ lazy_static::lazy_static! {
register_int_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter =
register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
static ref WS_SERVER_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_ws_server_fail", "WebSocket server failed")).unwrap();
static ref METRICS_SERVICE_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_metrics_service_fail", "Metrics service failed")).unwrap();
static ref HTTP_SERVER_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_http_server_fail", "Http server failed")).unwrap();
static ref PROMETHEUS_SERVER_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_prometheus_server_fail", "Prometheus server failed")).unwrap();
static ref POSTGRES_SERVICE_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_postgres_service_fail", "Postgres service failed")).unwrap();
static ref TX_SERVICE_FAIL: IntCounter =
register_int_counter!(opts!("literpc_rpc_tx_service_fail", "Tx service failed")).unwrap();
}
/// A bridge between clients and tpu
@ -201,21 +213,27 @@ impl LiteBridge {
tokio::select! {
res = ws_server => {
WS_SERVER_FAIL.inc();
bail!("WebSocket server exited unexpectedly {res:?}");
},
res = http_server => {
HTTP_SERVER_FAIL.inc();
bail!("HTTP server exited unexpectedly {res:?}");
},
res = metrics_capture => {
METRICS_SERVICE_FAIL.inc();
bail!("Metrics Capture exited unexpectedly {res:?}");
},
res = prometheus_sync => {
PROMETHEUS_SERVER_FAIL.inc();
bail!("Prometheus Service exited unexpectedly {res:?}");
},
res = postgres => {
TX_SERVICE_FAIL.inc();
bail!("Postgres service exited unexpectedly {res:?}");
},
res = jh_transaction_services => {
TX_SERVICE_FAIL.inc();
bail!("Transaction service exited unexpectedly {res:?}");
}
}

View File

@ -1,10 +1,10 @@
use std::time::Duration;
use anyhow::bail;
use clap::Parser;
use dotenv::dotenv;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_sdk::signature::Keypair;
use std::env;
@ -31,6 +31,11 @@ async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
}
}
lazy_static::lazy_static! {
static ref RESTARTS: IntCounter =
register_int_counter!(opts!("literpc_rpc_restarts", "Nutber of times lite rpc restarted")).unwrap();
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
@ -51,8 +56,6 @@ pub async fn main() -> anyhow::Result<()> {
dotenv().ok();
let identity = get_identity_keypair(&identity_keypair).await;
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let enable_postgres = enable_postgres
@ -64,33 +67,42 @@ pub async fn main() -> anyhow::Result<()> {
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let services = LiteBridge::new(
rpc_addr,
ws_addr,
fanout_size,
identity,
retry_after,
maximum_retries_per_tx,
)
.await?
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
);
loop {
let identity = get_identity_keypair(&identity_keypair).await;
let ctrl_c_signal = tokio::signal::ctrl_c();
let services = LiteBridge::new(
rpc_addr.clone(),
ws_addr.clone(),
fanout_size,
identity,
retry_after,
maximum_retries_per_tx,
)
.await?
.start_services(
lite_rpc_http_addr.clone(),
lite_rpc_ws_addr.clone(),
clean_interval_ms,
enable_postgres,
prometheus_addr.clone(),
);
tokio::select! {
res = services => {
bail!("Services quit unexpectedly {res:?}");
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");
let ctrl_c_signal = tokio::signal::ctrl_c();
Ok(())
tokio::select! {
res = services => {
const RESTART_DURATION: Duration = Duration::from_secs(20);
log::error!("Services quit unexpectedly {res:?} restarting in {RESTART_DURATION:?}");
tokio::time::sleep(RESTART_DURATION).await;
log::error!("Restarting services");
RESTARTS.inc();
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");
break Ok(())
}
}
}
}