clippy+fmt

This commit is contained in:
GroovieGermanikus 2024-03-26 09:56:13 +01:00
parent 95cde149e9
commit b47cda3c48
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
4 changed files with 13 additions and 24 deletions

View File

@ -20,12 +20,8 @@ use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
create_geyser_autoconnection_task_with_mpsc, use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;

View File

@ -1,11 +1,11 @@
use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util}; use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
use async_stream::stream; use async_stream::stream;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::{debug, info, log, trace, warn, Level}; use log::{debug, info, log, trace, warn, Level};
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout}; use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClientResult}; use yellowstone_grpc_client::GeyserGrpcClientResult;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::Status; use yellowstone_grpc_proto::tonic::Status;
@ -22,7 +22,6 @@ pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig, grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest, subscribe_filter: SubscribeRequest,
) -> impl Stream<Item = Message> { ) -> impl Stream<Item = Message> {
let mut state = ConnectionState::NotConnected(1); let mut state = ConnectionState::NotConnected(1);
// in case of cancellation, we restart from here: // in case of cancellation, we restart from here:

View File

@ -1,4 +1,4 @@
use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util}; use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level}; use log::{debug, error, info, log, trace, warn, Level};
use std::time::Duration; use std::time::Duration;
@ -52,7 +52,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) -> AbortHandle { ) -> AbortHandle {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
// task will be aborted when downstream receiver gets dropped // task will be aborted when downstream receiver gets dropped
let jh_geyser_task = tokio::spawn(async move { let jh_geyser_task = tokio::spawn(async move {
let mut state = ConnectionState::NotConnected(1); let mut state = ConnectionState::NotConnected(1);
@ -135,8 +134,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let subscribe_filter = subscribe_filter.clone(); let subscribe_filter = subscribe_filter.clone();
debug!("Subscribe with filter {:?}", subscribe_filter); debug!("Subscribe with filter {:?}", subscribe_filter);
let subscribe_result_timeout = timeout( let subscribe_result_timeout = timeout(
subscribe_timeout.unwrap_or(Duration::MAX), subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once2(subscribe_filter), client.subscribe_once2(subscribe_filter),

View File

@ -5,12 +5,11 @@ use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_proto::geyser::SubscribeRequest; use yellowstone_grpc_proto::geyser::SubscribeRequest;
use yellowstone_grpc_proto::prost::bytes::Bytes; use yellowstone_grpc_proto::prost::bytes::Bytes;
use yellowstone_grpc_proto::tonic; use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue;
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue;
use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
// see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs // see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs
const DEFAULT_BUFFER_SIZE: usize = 1024; const DEFAULT_BUFFER_SIZE: usize = 1024;
// see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45 // see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45
@ -35,22 +34,19 @@ impl Default for GeyserGrpcClientBufferConfig {
} }
impl GeyserGrpcClientBufferConfig { impl GeyserGrpcClientBufferConfig {
pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig { pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig {
if !filter.blocks.is_empty() { if !filter.blocks.is_empty() {
GeyserGrpcClientBufferConfig { GeyserGrpcClientBufferConfig {
buffer_size: Some(65536), // 64kb (default: 1k) buffer_size: Some(65536), // 64kb (default: 1k)
conn_window: Some(5242880), // 5mb (=default) conn_window: Some(5242880), // 5mb (=default)
stream_window: Some(4194304), // 4mb (default: 2m) stream_window: Some(4194304), // 4mb (default: 2m)
} }
} else { } else {
GeyserGrpcClientBufferConfig::default() GeyserGrpcClientBufferConfig::default()
} }
} }
} }
pub fn connect_with_timeout_with_buffers<E, T>( pub fn connect_with_timeout_with_buffers<E, T>(
endpoint: E, endpoint: E,
x_token: Option<T>, x_token: Option<T>,
@ -59,9 +55,10 @@ pub fn connect_with_timeout_with_buffers<E, T>(
request_timeout: Option<Duration>, request_timeout: Option<Duration>,
buffer_config: GeyserGrpcClientBufferConfig, buffer_config: GeyserGrpcClientBufferConfig,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>> ) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where where
E: Into<Bytes>, E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>, { T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10 // see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)? let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.buffer_size(buffer_config.buffer_size) .buffer_size(buffer_config.buffer_size)
@ -93,4 +90,4 @@ pub fn connect_with_timeout_with_buffers<E, T>(
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()), .max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),
); );
Ok(client) Ok(client)
} }