fixed main

This commit is contained in:
aniketfuryrocks 2023-06-20 18:26:48 +05:30
parent 02cd1e5f46
commit 13a7ab1838
No known key found for this signature in database
GPG Key ID: 1B75EA596D89FF06
7 changed files with 72 additions and 55 deletions

View File

@ -49,4 +49,4 @@ quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
solana-lite-rpc-services = {path = "services", version="0.2.1"}
solana-lite-rpc-core = {path = "core", version="0.2.1"}
async-trait = "0.1.68"
async-trait = "0.1.68"

View File

@ -4,7 +4,7 @@ use crate::{
};
use clap::Parser;
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, default_value_t = String::from(DEFAULT_RPC_ADDR))]

View File

@ -1,14 +1,18 @@
use std::time::Duration;
use anyhow::bail;
use clap::Parser;
use dotenv::dotenv;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::signature::Keypair;
use std::env;
async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
const RESTART_DURATION: Duration = Duration::from_secs(20);
async fn get_identity_keypair(identity_from_cli: &str) -> Keypair {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
@ -23,7 +27,7 @@ async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
} else if identity_from_cli.is_empty() {
Keypair::new()
} else {
let identity_file = tokio::fs::read_to_string(identity_from_cli.as_str())
let identity_file = tokio::fs::read_to_string(identity_from_cli)
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
@ -36,10 +40,7 @@ lazy_static::lazy_static! {
register_int_counter!(opts!("literpc_rpc_restarts", "Number of times lite rpc restarted")).unwrap();
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
let Args {
rpc_addr,
ws_addr,
@ -52,58 +53,79 @@ pub async fn main() -> anyhow::Result<()> {
identity_keypair,
maximum_retries_per_tx,
transaction_retry_after_secs,
} = Args::parse();
} = args;
let identity = get_identity_keypair(&identity_keypair).await;
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
LiteBridge::new(
rpc_addr,
ws_addr,
fanout_size,
identity,
retry_after,
maximum_retries_per_tx,
)
.await?
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
)
.await
}
fn get_args() -> Args {
let mut args = Args::parse();
dotenv().ok();
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let enable_postgres = enable_postgres
args.enable_postgres = args.enable_postgres
|| if let Ok(enable_postgres_env_var) = env::var("PG_ENABLED") {
enable_postgres_env_var != "false"
} else {
false
};
let retry_after = Duration::from_secs(transaction_retry_after_secs);
args
}
loop {
let identity = get_identity_keypair(&identity_keypair).await;
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let services = LiteBridge::new(
rpc_addr.clone(),
ws_addr.clone(),
fanout_size,
identity,
retry_after,
maximum_retries_per_tx,
)
.await?
.start_services(
lite_rpc_http_addr.clone(),
lite_rpc_ws_addr.clone(),
clean_interval_ms,
enable_postgres,
prometheus_addr.clone(),
);
let args = get_args();
let ctrl_c_signal = tokio::signal::ctrl_c();
let main: AnyhowJoinHandle = tokio::spawn(async move {
loop {
let Err(err) = start_lite_rpc(args.clone()).await else{
bail!("LiteBridge services returned without error");
};
tokio::select! {
res = services => {
const RESTART_DURATION: Duration = Duration::from_secs(20);
// log and restart
log::error!("Services quit unexpectedly {err:?} restarting in {RESTART_DURATION:?}");
tokio::time::sleep(RESTART_DURATION).await;
log::error!("Services quit unexpectedly {res:?} restarting in {RESTART_DURATION:?}");
tokio::time::sleep(RESTART_DURATION).await;
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");
break Ok(())
}
// increment restart
log::error!("Restarting services");
RESTARTS.inc();
}
});
log::error!("Restarting services");
RESTARTS.inc();
let ctrl_c_signal = tokio::signal::ctrl_c();
tokio::select! {
res = main => {
bail!("LiteRpc exited with result {res:?}")
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");
Ok(())
}
}
}

View File

@ -6,9 +6,7 @@ use postgres_native_tls::MakeTlsConnector;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{RwLock, RwLockReadGuard},
};
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use native_tls::{Certificate, Identity, TlsConnector};

View File

@ -33,7 +33,8 @@ use solana_lite_rpc_core::{
BlockNotification, NotificationMsg, NotificationSender, TransactionUpdateNotification,
},
subscription_handler::{SubscptionHanderSink, SubscriptionHandler},
tx_store::{TxProps, TxStore}, AnyhowJoinHandle,
tx_store::{TxProps, TxStore},
AnyhowJoinHandle,
};
lazy_static::lazy_static! {

View File

@ -193,9 +193,7 @@ impl TpuService {
log::info!("Got error while polling slot {}", err);
}
bail!(
"Reached max amount of errors to fetch latest slot, exiting poll slot loop"
)
bail!("Reached max amount of errors to fetch latest slot, exiting poll slot loop")
})
}

View File

@ -3,9 +3,7 @@ use anyhow::bail;
use log::error;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::{tx_store::TxStore, AnyhowJoinHandle};
use std::{
time::Duration,
};
use std::time::Duration;
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
time::Instant,