diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 5d398929..9c418e05 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -67,6 +67,8 @@ pub struct Config { /// postgres config #[serde(default)] pub postgres: Option, + + pub max_number_of_connection: Option, } impl Config { @@ -173,6 +175,10 @@ impl Config { .map(Some) .unwrap_or(config.grpc_x_token4); + config.max_number_of_connection = env::var("MAX_NB_OF_CONNECTIONS_WITH_LEADERS") + .map(|x| x.parse().ok()) + .unwrap_or(config.max_number_of_connection); + config.postgres = PostgresSessionConfig::new_from_env()?.or(config.postgres); Ok(config) diff --git a/lite-rpc/src/lib.rs b/lite-rpc/src/lib.rs index ccf1825c..ff4e6937 100644 --- a/lite-rpc/src/lib.rs +++ b/lite-rpc/src/lib.rs @@ -41,3 +41,6 @@ pub const GRPC_VERSION: &str = "1.16.1"; // cache transactions of 1000 slots by default #[from_env] pub const NB_SLOTS_TRANSACTIONS_TO_CACHE: u64 = 1000; + +#[from_env] +pub const MAX_NB_OF_CONNECTIONS_WITH_LEADERS: usize = 8; diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 5279edbb..f8dbe294 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -7,7 +7,7 @@ use lite_rpc::bridge::LiteBridge; use lite_rpc::cli::Config; use lite_rpc::postgres_logger::PostgresLogger; use lite_rpc::service_spawner::ServiceSpawner; -use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE; +use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, MAX_NB_OF_CONNECTIONS_WITH_LEADERS}; use log::{debug, info}; use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription; @@ -216,7 +216,9 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: connection_timeout: Duration::from_secs(1), connection_retry_count: 10, finalize_timeout: Duration::from_millis(1000), - max_number_of_connections: 8, + max_number_of_connections: args + .max_number_of_connection + .unwrap_or(MAX_NB_OF_CONNECTIONS_WITH_LEADERS), unistream_timeout: Duration::from_millis(500), write_timeout: Duration::from_secs(1), number_of_transactions_per_unistream: 1, @@ -247,7 +249,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, notification_channel.clone(), maximum_retries_per_tx, - slot_notifier, + slot_notifier.resubscribe(), ); let support_service = tokio::spawn(async move { spawner.spawn_support_services().await }); @@ -262,10 +264,12 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: history, block_priofees_service, account_priofees_service, - blocks_notifier, + blocks_notifier.resubscribe(), ) .start(lite_rpc_http_addr, lite_rpc_ws_addr), ); + drop(slot_notifier); + drop(blocks_notifier); tokio::select! { res = tx_service_jh => {