allow x-token

This commit is contained in:
Arrowana 2023-11-07 21:04:17 +11:00
parent 45392943ae
commit 15ffb44f6e
4 changed files with 27 additions and 7 deletions

View File

@ -249,6 +249,7 @@ fn process_block(
pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: Sender<ProducedBlock>,
commitment_level: CommitmentLevel,
) -> AnyhowJoinHandle {
@ -271,7 +272,7 @@ pub fn create_block_processing_task(
tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None)?;
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?;
let mut stream = client
.subscribe_once(
HashMap::new(),
@ -314,6 +315,7 @@ pub fn create_block_processing_task(
pub fn create_grpc_subscription(
rpc_client: Arc<RpcClient>,
grpc_addr: String,
grpc_x_token: Option<String>,
expected_grpc_version: String,
) -> anyhow::Result<(EndpointStreaming, Vec<AnyhowJoinHandle>)> {
let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(10);
@ -324,10 +326,11 @@ pub fn create_grpc_subscription(
let mut slots = HashMap::new();
slots.insert("client".to_string(), SubscribeRequestFilterSlots {});
let grpc_addr_cp = grpc_addr.clone();
let grpc_addr_cp: String = grpc_addr.clone();
let x_token_cp = grpc_x_token.clone();
let slot_task: AnyhowJoinHandle = tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr_cp, None::<&'static str>, None)?;
let mut client = GeyserGrpcClient::connect(grpc_addr_cp, x_token_cp, None)?;
let version = client.get_version().await?.version;
if version != expected_grpc_version {
@ -379,11 +382,16 @@ pub fn create_grpc_subscription(
let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task(
grpc_addr.clone(),
grpc_x_token.clone(),
block_sx.clone(),
CommitmentLevel::Confirmed,
);
let block_finalized_task: AnyhowJoinHandle =
create_block_processing_task(grpc_addr, block_sx, CommitmentLevel::Finalized);
let block_finalized_task: AnyhowJoinHandle = create_block_processing_task(
grpc_addr,
grpc_x_token,
block_sx,
CommitmentLevel::Finalized,
);
let cluster_info_polling =
poll_vote_accounts_and_cluster_info(rpc_client, cluster_info_sx, va_sx);

View File

@ -1,4 +1,7 @@
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{self, Duration},
};
use anyhow::{bail, Context};
use solana_client::nonblocking::rpc_client::RpcClient;

View File

@ -37,4 +37,6 @@ pub struct Args {
/// grpc address
#[arg(long, default_value_t = String::from(DEFAULT_GRPC_ADDR))]
pub grpc_addr: String,
#[arg(long)]
pub grpc_x_token: Option<String>,
}

View File

@ -46,6 +46,7 @@ use std::env;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::sleep;
use crate::rpc_tester::RpcTester;
@ -95,6 +96,7 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
quic_proxy_addr,
use_grpc,
grpc_addr,
grpc_x_token,
..
} = args;
@ -109,7 +111,12 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
let (subscriptions, cluster_endpoint_tasks) = if use_grpc {
create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())?
create_grpc_subscription(
rpc_client.clone(),
grpc_addr,
grpc_x_token,
GRPC_VERSION.to_string(),
)?
} else {
create_json_rpc_polling_subscription(rpc_client.clone())?
};