use std::time::Duration; use tonic_health::pb::health_client::HealthClient; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken}; use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient; use yellowstone_grpc_proto::geyser::SubscribeRequest; use yellowstone_grpc_proto::prost::bytes::Bytes; use yellowstone_grpc_proto::tonic; use tonic::codec::CompressionEncoding; use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; // see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs const DEFAULT_BUFFER_SIZE: usize = 1024; // see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45 const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb #[derive(Debug, Clone)] pub struct GeyserGrpcClientBufferConfig { pub buffer_size: Option, pub conn_window: Option, pub stream_window: Option, } impl Default for GeyserGrpcClientBufferConfig { fn default() -> Self { GeyserGrpcClientBufferConfig { buffer_size: Some(DEFAULT_BUFFER_SIZE), conn_window: Some(DEFAULT_CONN_WINDOW), stream_window: Some(DEFAULT_STREAM_WINDOW), } } } impl GeyserGrpcClientBufferConfig { pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig { if !filter.blocks.is_empty() { GeyserGrpcClientBufferConfig { buffer_size: Some(65536), // 64kb (default: 1k) conn_window: Some(5242880), // 5mb (=default) stream_window: Some(4194304), // 4mb (default: 2m) } } else { GeyserGrpcClientBufferConfig::default() } } } pub async fn connect_with_timeout_with_buffers( endpoint: E, x_token: Option, tls_config: Option, connect_timeout: Option, request_timeout: Option, buffer_config: GeyserGrpcClientBufferConfig, compression: bool, ) -> GeyserGrpcClientResult> where E: Into, T: TryInto, { // see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10 let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)? .buffer_size(buffer_config.buffer_size) .initial_connection_window_size(buffer_config.conn_window) .initial_stream_window_size(buffer_config.stream_window); if let Some(tls_config) = tls_config { endpoint = endpoint.tls_config(tls_config)?; } if let Some(connect_timeout) = connect_timeout { endpoint = endpoint.timeout(connect_timeout); } if let Some(request_timeout) = request_timeout { endpoint = endpoint.timeout(request_timeout); } let x_token: Option = match x_token { Some(x_token) => Some(x_token.try_into()?), None => None, }; let interceptor = InterceptorXToken { x_token }; let channel = endpoint.connect_lazy(); let mut geyser_client = GeyserClient::with_interceptor(channel.clone(), interceptor.clone()) .max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()); if compression { geyser_client = geyser_client.accept_compressed(CompressionEncoding::Gzip).send_compressed(CompressionEncoding::Gzip); } let client = GeyserGrpcClient::new( HealthClient::with_interceptor(channel, interceptor), geyser_client ); Ok(client) }