diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 5fd6d46e..49bd0949 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -249,6 +249,7 @@ fn process_block( pub fn create_block_processing_task( grpc_addr: String, + grpc_x_token: Option, block_sx: Sender, commitment_level: CommitmentLevel, ) -> AnyhowJoinHandle { @@ -271,7 +272,7 @@ pub fn create_block_processing_task( tokio::spawn(async move { // connect to grpc - let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None)?; + let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?; let mut stream = client .subscribe_once( HashMap::new(), @@ -314,6 +315,7 @@ pub fn create_block_processing_task( pub fn create_grpc_subscription( rpc_client: Arc, grpc_addr: String, + grpc_x_token: Option, expected_grpc_version: String, ) -> anyhow::Result<(EndpointStreaming, Vec)> { let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(10); @@ -324,10 +326,11 @@ pub fn create_grpc_subscription( let mut slots = HashMap::new(); slots.insert("client".to_string(), SubscribeRequestFilterSlots {}); - let grpc_addr_cp = grpc_addr.clone(); + let grpc_addr_cp: String = grpc_addr.clone(); + let x_token_cp = grpc_x_token.clone(); let slot_task: AnyhowJoinHandle = tokio::spawn(async move { // connect to grpc - let mut client = GeyserGrpcClient::connect(grpc_addr_cp, None::<&'static str>, None)?; + let mut client = GeyserGrpcClient::connect(grpc_addr_cp, x_token_cp, None)?; let version = client.get_version().await?.version; if version != expected_grpc_version { @@ -379,11 +382,16 @@ pub fn create_grpc_subscription( let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task( grpc_addr.clone(), + grpc_x_token.clone(), block_sx.clone(), CommitmentLevel::Confirmed, ); - let block_finalized_task: AnyhowJoinHandle = - create_block_processing_task(grpc_addr, block_sx, CommitmentLevel::Finalized); + let block_finalized_task: AnyhowJoinHandle = create_block_processing_task( + grpc_addr, + grpc_x_token, + block_sx, + CommitmentLevel::Finalized, + ); let cluster_info_polling = poll_vote_accounts_and_cluster_info(rpc_client, cluster_info_sx, va_sx); diff --git a/cluster-endpoints/src/rpc_polling/poll_slots.rs b/cluster-endpoints/src/rpc_polling/poll_slots.rs index fd363f4f..fe97622d 100644 --- a/cluster-endpoints/src/rpc_polling/poll_slots.rs +++ b/cluster-endpoints/src/rpc_polling/poll_slots.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{self, Duration}, +}; use anyhow::{bail, Context}; use solana_client::nonblocking::rpc_client::RpcClient; diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 78d2659a..9e9cfdae 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -37,4 +37,6 @@ pub struct Args { /// grpc address #[arg(long, default_value_t = String::from(DEFAULT_GRPC_ADDR))] pub grpc_addr: String, + #[arg(long)] + pub grpc_x_token: Option, } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index b968068f..97b43c15 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -46,6 +46,7 @@ use std::env; use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::Arc; use tokio::sync::mpsc; +use tokio::time::sleep; use crate::rpc_tester::RpcTester; @@ -95,6 +96,7 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc) -> anyhow::R quic_proxy_addr, use_grpc, grpc_addr, + grpc_x_token, .. } = args; @@ -109,7 +111,12 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc) -> anyhow::R let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr); let (subscriptions, cluster_endpoint_tasks) = if use_grpc { - create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())? + create_grpc_subscription( + rpc_client.clone(), + grpc_addr, + grpc_x_token, + GRPC_VERSION.to_string(), + )? } else { create_json_rpc_polling_subscription(rpc_client.clone())? };