diff --git a/Cargo.lock b/Cargo.lock index df3c9b7..80e1e7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1250,6 +1250,7 @@ dependencies = [ "solana-logger", "solana-sdk", "tokio", + "tonic-health", "tracing", "tracing-subscriber", "url", diff --git a/Cargo.toml b/Cargo.toml index f4a69f3..aac6157 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ tracing = "0.1.37" itertools = "0.10.5" derive_more = "0.99.17" +tonic-health = "0.10.2" + base64 = "0.21.5" bincode = "1.3.3" diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 4413b97..cb61313 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,4 +1,4 @@ -use crate::{Attempt, GrpcSourceConfig, Message}; +use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use std::time::Duration; @@ -75,13 +75,15 @@ pub fn create_geyser_autoconnection_task_with_mpsc( attempt, addr ); - let connect_result = GeyserGrpcClient::connect_with_timeout( + + warn!("Use HACKED version of connect_with_timeout_hacked"); + let connect_result = yellowstone_grpc_util::connect_with_timeout_hacked( addr, token, - config, - connect_timeout, - request_timeout, - false, + // config, + // connect_timeout, + // request_timeout, + // false, ) .await; @@ -132,6 +134,8 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let subscribe_filter = subscribe_filter.clone(); debug!("Subscribe with filter {:?}", subscribe_filter); + + let subscribe_result_timeout = timeout( subscribe_timeout.unwrap_or(Duration::MAX), client.subscribe_once2(subscribe_filter), diff --git a/src/lib.rs b/src/lib.rs index a5c1690..1123d87 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod grpc_subscription_autoreconnect_streams; pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; mod obfuscate; +mod yellowstone_grpc_util; // 1-based attempt counter type Attempt = u32; diff --git a/src/yellowstone_grpc_util.rs b/src/yellowstone_grpc_util.rs new file mode 100644 index 0000000..0591efb --- /dev/null +++ b/src/yellowstone_grpc_util.rs @@ -0,0 +1,64 @@ +use std::time::Duration; +use tonic_health::pb::health_client::HealthClient; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken}; +use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient; +use yellowstone_grpc_proto::prost::bytes::Bytes; +use yellowstone_grpc_proto::tonic; +use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue; +use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; +use yellowstone_grpc_proto::tonic::service::Interceptor; +use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; + + +pub async fn connect_with_timeout( + endpoint: E, + x_token: Option, + tls_config: Option, + connect_timeout: Option, + request_timeout: Option, + connect_lazy: bool, +) -> GeyserGrpcClientResult> + where + E: Into, + T: TryInto, +{ + GeyserGrpcClient::connect_with_timeout( + endpoint, x_token, tls_config, connect_timeout, request_timeout, connect_lazy).await +} + + +pub async fn connect_with_timeout_hacked(endpoint: E, + x_token: Option,) -> GeyserGrpcClientResult> + where + E: Into, + T: TryInto, { + let endpoint = tonic::transport::Endpoint::from_shared(endpoint).unwrap() // FIXME + .buffer_size(Some(65536)) + .initial_connection_window_size(4194304) + .initial_stream_window_size(4194304) + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(10)) + // .http2_adaptive_window() + .tls_config(ClientTlsConfig::new()).unwrap(); // FIXME + + let x_token: Option = match x_token { + Some(x_token) => Some(x_token.try_into().unwrap()), // FIXME replace unwrap + None => None, + }; + // match x_token { + // Some(token) if token.is_empty() => { + // panic!("empty token"); + // } + // _ => {} + // } + let interceptor = InterceptorXToken { x_token }; + + let channel = endpoint.connect_lazy(); + let mut client = GeyserGrpcClient::new( + // TODO move tonic-health + HealthClient::with_interceptor(channel.clone(), interceptor.clone()), + GeyserClient::with_interceptor(channel, interceptor) + .max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()), + ); + Ok(client) +} \ No newline at end of file