101 lines
3.7 KiB
Rust
101 lines
3.7 KiB
Rust
use std::time::Duration;
|
|
use tonic_health::pb::health_client::HealthClient;
|
|
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken};
|
|
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
|
|
use yellowstone_grpc_proto::geyser::SubscribeRequest;
|
|
use yellowstone_grpc_proto::prost::bytes::Bytes;
|
|
use yellowstone_grpc_proto::tonic;
|
|
use tonic::codec::CompressionEncoding;
|
|
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
|
|
use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue;
|
|
use yellowstone_grpc_proto::tonic::service::Interceptor;
|
|
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
|
|
|
// see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs
|
|
const DEFAULT_BUFFER_SIZE: usize = 1024;
|
|
// see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45
|
|
const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
|
|
const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct GeyserGrpcClientBufferConfig {
|
|
pub buffer_size: Option<usize>,
|
|
pub conn_window: Option<u32>,
|
|
pub stream_window: Option<u32>,
|
|
}
|
|
|
|
impl Default for GeyserGrpcClientBufferConfig {
|
|
fn default() -> Self {
|
|
GeyserGrpcClientBufferConfig {
|
|
buffer_size: Some(DEFAULT_BUFFER_SIZE),
|
|
conn_window: Some(DEFAULT_CONN_WINDOW),
|
|
stream_window: Some(DEFAULT_STREAM_WINDOW),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl GeyserGrpcClientBufferConfig {
|
|
pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig {
|
|
if !filter.blocks.is_empty() {
|
|
GeyserGrpcClientBufferConfig {
|
|
buffer_size: Some(65536), // 64kb (default: 1k)
|
|
conn_window: Some(5242880), // 5mb (=default)
|
|
stream_window: Some(4194304), // 4mb (default: 2m)
|
|
}
|
|
} else {
|
|
GeyserGrpcClientBufferConfig::default()
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn connect_with_timeout_with_buffers<E, T>(
|
|
endpoint: E,
|
|
x_token: Option<T>,
|
|
tls_config: Option<ClientTlsConfig>,
|
|
connect_timeout: Option<Duration>,
|
|
request_timeout: Option<Duration>,
|
|
buffer_config: GeyserGrpcClientBufferConfig,
|
|
compression: bool,
|
|
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
|
|
where
|
|
E: Into<Bytes>,
|
|
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
|
|
{
|
|
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
|
|
let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
|
|
.buffer_size(buffer_config.buffer_size)
|
|
.initial_connection_window_size(buffer_config.conn_window)
|
|
.initial_stream_window_size(buffer_config.stream_window);
|
|
|
|
if let Some(tls_config) = tls_config {
|
|
endpoint = endpoint.tls_config(tls_config)?;
|
|
}
|
|
|
|
if let Some(connect_timeout) = connect_timeout {
|
|
endpoint = endpoint.timeout(connect_timeout);
|
|
}
|
|
|
|
if let Some(request_timeout) = request_timeout {
|
|
endpoint = endpoint.timeout(request_timeout);
|
|
}
|
|
|
|
let x_token: Option<AsciiMetadataValue> = match x_token {
|
|
Some(x_token) => Some(x_token.try_into()?),
|
|
None => None,
|
|
};
|
|
let interceptor = InterceptorXToken { x_token };
|
|
|
|
let channel = endpoint.connect_lazy();
|
|
let mut geyser_client = GeyserClient::with_interceptor(channel.clone(), interceptor.clone())
|
|
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size());
|
|
|
|
if compression {
|
|
geyser_client = geyser_client.accept_compressed(CompressionEncoding::Gzip).send_compressed(CompressionEncoding::Gzip);
|
|
}
|
|
let client = GeyserGrpcClient::new(
|
|
HealthClient::with_interceptor(channel, interceptor),
|
|
geyser_client
|
|
);
|
|
Ok(client)
|
|
}
|