From a109037223f2b774da0defe0a29df6d18bcbe93b Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 16 Apr 2024 15:54:38 +0200 Subject: [PATCH] fix fmt --- .../src/grpc/grpc_accounts_streaming.rs | 13 +++++--- cluster-endpoints/src/grpc_subscription.rs | 33 ++++++++++--------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs index be39d553..7713de61 100644 --- a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs +++ b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs @@ -6,8 +6,10 @@ use std::{ time::Duration, }; +use geyser_grpc_connector::yellowstone_grpc_util::{ + connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, +}; use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig}; -use geyser_grpc_connector::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig}; use itertools::Itertools; use solana_lite_rpc_core::{ commitment_utils::Commitment, @@ -111,9 +113,8 @@ pub fn start_account_streaming_tasks( ping: None, }; - let mut client = create_connection(&grpc_config).await?; - + let account_stream = client.subscribe_once2(program_subscription).await.unwrap(); // each account subscription batch will require individual stream @@ -213,7 +214,9 @@ pub fn start_account_streaming_tasks( }) } -async fn create_connection(grpc_config: &GrpcSourceConfig) -> GeyserGrpcClientResult> { +async fn create_connection( + grpc_config: &GrpcSourceConfig, +) -> GeyserGrpcClientResult> { connect_with_timeout_with_buffers( grpc_config.grpc_addr.clone(), grpc_config.grpc_x_token.clone(), @@ -226,7 +229,7 @@ async fn create_connection(grpc_config: &GrpcSourceConfig) -> GeyserGrpcClientRe stream_window: Some(4194304), }, ) - .await + .await } pub fn create_grpc_account_streaming( diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index e1c18d41..7f0150bf 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -5,6 +5,9 @@ use crate::grpc_multiplex::{ }; use anyhow::Context; use futures::StreamExt; +use geyser_grpc_connector::yellowstone_grpc_util::{ + connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig, +}; use geyser_grpc_connector::GrpcSourceConfig; use itertools::Itertools; use log::trace; @@ -36,13 +39,10 @@ use std::cell::OnceCell; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use geyser_grpc_connector::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig}; use tokio::sync::{broadcast, Notify}; use tracing::trace_span; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{ - CommitmentLevel, SubscribeRequestFilterBlocks -}; +use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks}; use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{ poll_cluster_info, poll_vote_accounts, @@ -291,18 +291,19 @@ pub fn create_block_processing_task( ); // connect to grpc - let mut client = - connect_with_timeout_with_buffers( - grpc_addr.clone(), grpc_x_token.clone(), - None, - Some(Duration::from_secs(10)), - Some(Duration::from_secs(10)), - GeyserGrpcClientBufferConfig { - buffer_size: Some(65536), - conn_window: Some(5242880), - stream_window: Some(4194304), - }, - ).await?; + let mut client = connect_with_timeout_with_buffers( + grpc_addr.clone(), + grpc_x_token.clone(), + None, + Some(Duration::from_secs(10)), + Some(Duration::from_secs(10)), + GeyserGrpcClientBufferConfig { + buffer_size: Some(65536), + conn_window: Some(5242880), + stream_window: Some(4194304), + }, + ) + .await?; let mut stream = tokio::select! { res = client .subscribe_once(