This commit is contained in:
GroovieGermanikus 2024-04-16 10:12:59 +02:00
parent c1ca3e2e67
commit 1d2937ad95
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 16 additions and 14 deletions

View File

@ -1,9 +1,7 @@
use futures::Stream;
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
use base64::Engine;
use itertools::Itertools;
@ -23,10 +21,10 @@ use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{
create_geyser_autoconnection_task, create_geyser_autoconnection_task_with_mpsc,
create_geyser_autoconnection_task_with_mpsc,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};

View File

@ -14,7 +14,6 @@ use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
use csv::Writer;
#[allow(dead_code)]
fn start_example_blockmini_consumer(

View File

@ -5,9 +5,10 @@ use log::{debug, info, log, trace, warn, Level};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_client::GeyserGrpcClientResult;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::Status;
use crate::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig};
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
@ -45,12 +46,16 @@ pub fn create_geyser_reconnecting_stream(
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {
let connect_result = GeyserGrpcClient::connect_with_timeout(
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
let connect_result = connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter),
)
.await;
let mut client = connect_result?;

View File

@ -9,7 +9,7 @@ pub mod channel_plugger;
pub mod grpc_subscription_autoreconnect_streams;
pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
mod yellowstone_grpc_util;
pub mod yellowstone_grpc_util;
mod obfuscate;
// 1-based attempt counter

View File

@ -104,7 +104,7 @@ pub async fn connect_with_timeout_with_buffers<E, T>(
let interceptor = InterceptorXToken { x_token };
let channel = endpoint.connect_lazy();
let mut client = GeyserGrpcClient::new(
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),