From b47cda3c48f800314257b4b027dc02da8a6b8a19 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 26 Mar 2024 09:56:13 +0100 Subject: [PATCH] clippy+fmt --- examples/stream_blocks_mainnet_task.rs | 8 ++------ ...grpc_subscription_autoreconnect_streams.rs | 5 ++--- src/grpc_subscription_autoreconnect_tasks.rs | 5 +---- src/yellowstone_grpc_util.rs | 19 ++++++++----------- 4 files changed, 13 insertions(+), 24 deletions(-) diff --git a/examples/stream_blocks_mainnet_task.rs b/examples/stream_blocks_mainnet_task.rs index 431ae2e..b1bf55a 100644 --- a/examples/stream_blocks_mainnet_task.rs +++ b/examples/stream_blocks_mainnet_task.rs @@ -20,12 +20,8 @@ use solana_sdk::transaction::TransactionError; use tokio::sync::mpsc::Receiver; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; -use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ - create_geyser_autoconnection_task_with_mpsc, -}; -use geyser_grpc_connector::grpcmultiplex_fastestwins::{ - FromYellowstoneExtractor, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; +use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index b79c8f6..712e9f4 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -1,11 +1,11 @@ -use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util}; +use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message}; use async_stream::stream; use futures::{Stream, StreamExt}; use log::{debug, info, log, trace, warn, Level}; use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; -use yellowstone_grpc_client::{GeyserGrpcClientResult}; +use yellowstone_grpc_client::GeyserGrpcClientResult; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::Status; @@ -22,7 +22,6 @@ pub fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, ) -> impl Stream { - let mut state = ConnectionState::NotConnected(1); // in case of cancellation, we restart from here: diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 8240a5d..f0d97b9 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,4 +1,4 @@ -use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util}; +use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use std::time::Duration; @@ -52,7 +52,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc( ) -> AbortHandle { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ - // task will be aborted when downstream receiver gets dropped let jh_geyser_task = tokio::spawn(async move { let mut state = ConnectionState::NotConnected(1); @@ -135,8 +134,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let subscribe_filter = subscribe_filter.clone(); debug!("Subscribe with filter {:?}", subscribe_filter); - - let subscribe_result_timeout = timeout( subscribe_timeout.unwrap_or(Duration::MAX), client.subscribe_once2(subscribe_filter), diff --git a/src/yellowstone_grpc_util.rs b/src/yellowstone_grpc_util.rs index 3696a18..94ec9a2 100644 --- a/src/yellowstone_grpc_util.rs +++ b/src/yellowstone_grpc_util.rs @@ -5,12 +5,11 @@ use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient; use yellowstone_grpc_proto::geyser::SubscribeRequest; use yellowstone_grpc_proto::prost::bytes::Bytes; use yellowstone_grpc_proto::tonic; -use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue; use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; +use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; - // see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs const DEFAULT_BUFFER_SIZE: usize = 1024; // see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45 @@ -35,22 +34,19 @@ impl Default for GeyserGrpcClientBufferConfig { } impl GeyserGrpcClientBufferConfig { - pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig { if !filter.blocks.is_empty() { GeyserGrpcClientBufferConfig { - buffer_size: Some(65536), // 64kb (default: 1k) - conn_window: Some(5242880), // 5mb (=default) + buffer_size: Some(65536), // 64kb (default: 1k) + conn_window: Some(5242880), // 5mb (=default) stream_window: Some(4194304), // 4mb (default: 2m) } } else { GeyserGrpcClientBufferConfig::default() } } - } - pub fn connect_with_timeout_with_buffers( endpoint: E, x_token: Option, @@ -59,9 +55,10 @@ pub fn connect_with_timeout_with_buffers( request_timeout: Option, buffer_config: GeyserGrpcClientBufferConfig, ) -> GeyserGrpcClientResult> - where - E: Into, - T: TryInto, { +where + E: Into, + T: TryInto, +{ // see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10 let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)? .buffer_size(buffer_config.buffer_size) @@ -93,4 +90,4 @@ pub fn connect_with_timeout_with_buffers( .max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()), ); Ok(client) -} \ No newline at end of file +}