add connect_with_timeout_hacked

This commit is contained in:
GroovieGermanikus 2024-03-22 17:57:29 +01:00
parent 7e2fd7f97d
commit 066f9a8800
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 78 additions and 6 deletions

1
Cargo.lock generated
View File

@ -1250,6 +1250,7 @@ dependencies = [
"solana-logger",
"solana-sdk",
"tokio",
"tonic-health",
"tracing",
"tracing-subscriber",
"url",

View File

@ -27,6 +27,8 @@ tracing = "0.1.37"
itertools = "0.10.5"
derive_more = "0.99.17"
tonic-health = "0.10.2"
base64 = "0.21.5"
bincode = "1.3.3"

View File

@ -1,4 +1,4 @@
use crate::{Attempt, GrpcSourceConfig, Message};
use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util};
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::time::Duration;
@ -75,13 +75,15 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
attempt,
addr
);
let connect_result = GeyserGrpcClient::connect_with_timeout(
warn!("Use HACKED version of connect_with_timeout_hacked");
let connect_result = yellowstone_grpc_util::connect_with_timeout_hacked(
addr,
token,
config,
connect_timeout,
request_timeout,
false,
// config,
// connect_timeout,
// request_timeout,
// false,
)
.await;
@ -132,6 +134,8 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let subscribe_filter = subscribe_filter.clone();
debug!("Subscribe with filter {:?}", subscribe_filter);
let subscribe_result_timeout = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once2(subscribe_filter),

View File

@ -13,6 +13,7 @@ pub mod grpc_subscription_autoreconnect_streams;
pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
mod obfuscate;
mod yellowstone_grpc_util;
// 1-based attempt counter
type Attempt = u32;

View File

@ -0,0 +1,64 @@
use std::time::Duration;
use tonic_health::pb::health_client::HealthClient;
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken};
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_proto::prost::bytes::Bytes;
use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue;
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
pub async fn connect_with_timeout<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
connect_lazy: bool,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
GeyserGrpcClient::connect_with_timeout(
endpoint, x_token, tls_config, connect_timeout, request_timeout, connect_lazy).await
}
pub async fn connect_with_timeout_hacked<E, T>(endpoint: E,
x_token: Option<T>,) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>, {
let endpoint = tonic::transport::Endpoint::from_shared(endpoint).unwrap() // FIXME
.buffer_size(Some(65536))
.initial_connection_window_size(4194304)
.initial_stream_window_size(4194304)
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
// .http2_adaptive_window()
.tls_config(ClientTlsConfig::new()).unwrap(); // FIXME
let x_token: Option<AsciiMetadataValue> = match x_token {
Some(x_token) => Some(x_token.try_into().unwrap()), // FIXME replace unwrap
None => None,
};
// match x_token {
// Some(token) if token.is_empty() => {
// panic!("empty token");
// }
// _ => {}
// }
let interceptor = InterceptorXToken { x_token };
let channel = endpoint.connect_lazy();
let mut client = GeyserGrpcClient::new(
// TODO move tonic-health
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),
);
Ok(client)
}