From 1d0b7890c3f422679cdd018d1c5c783424f85d69 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 22 Jan 2024 18:26:26 +0100 Subject: [PATCH] move Message type --- examples/stream_blocks_autoconnect.rs | 4 ++-- ...grpc_subscription_autoreconnect_streams.rs | 13 +----------- src/grpc_subscription_autoreconnect_tasks.rs | 15 +++----------- src/grpcmultiplex_fastestwins.rs | 4 ++-- src/lib.rs | 20 +++++++++++++------ 5 files changed, 22 insertions(+), 34 deletions(-) diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 4e21eb3..c6edbdb 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -10,12 +10,12 @@ use geyser_grpc_connector::channel_plugger::{ }; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ - create_geyser_autoconnection_task, Message, + create_geyser_autoconnection_task, }; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; -use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use tokio::time::{sleep, Duration}; use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index 544389a..840451b 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -1,4 +1,4 @@ -use crate::GrpcSourceConfig; +use crate::{Attempt, GrpcSourceConfig, Message}; use async_stream::stream; use futures::channel::mpsc; use futures::{Stream, StreamExt}; @@ -26,17 +26,6 @@ use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use yellowstone_grpc_proto::tonic::{Code, Status}; -type Attempt = u32; - -// wraps payload and status messages -// clone is required by broacast channel -#[derive(Clone)] -pub enum Message { - GeyserSubscribeUpdate(Box), - // connect (attempt=1) or reconnect(attempt=2..) - Connecting(Attempt), -} - enum ConnectionState>> { NotConnected(Attempt), Connecting(Attempt, JoinHandle>), diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 52c61a5..e8a4fc2 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,4 +1,4 @@ -use crate::GrpcSourceConfig; +use crate::{GrpcSourceConfig, Message}; use anyhow::bail; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; @@ -28,15 +28,6 @@ use yellowstone_grpc_proto::tonic::{Code, Status}; type Attempt = u32; -// wraps payload and status messages -// clone is required by broacast channel -#[derive(Clone)] -pub enum Message { - GeyserSubscribeUpdate(Box), - // connect (attempt=1) or reconnect(attempt=2..) - Connecting(Attempt), -} - enum ConnectionState>> { NotConnected(Attempt), Connecting(Attempt, JoinHandle>), @@ -76,7 +67,7 @@ pub fn create_geyser_autoconnection_task( subscribe_filter: SubscribeRequest, ) -> (AbortHandle, Receiver) { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ - let (sender, receiver_stream) = tokio::sync::mpsc::channel::(1); + let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); let jh_geyser_task = tokio::spawn(async move { let mut state = State::NotConnected(0); @@ -312,7 +303,7 @@ pub fn create_geyser_autoconnection_task( } }); - (jh_geyser_task.abort_handle(), receiver_stream) + (jh_geyser_task.abort_handle(), receiver_channel) } #[cfg(test)] diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index b8ef7cb..b0f36f0 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -1,11 +1,11 @@ -use crate::grpc_subscription_autoreconnect_streams::Message; -use crate::grpc_subscription_autoreconnect_streams::Message::GeyserSubscribeUpdate; +use crate::Message; use async_stream::stream; use futures::{Stream, StreamExt}; use log::{info, warn}; use merge_streams::MergeStreams; use solana_sdk::clock::Slot; use yellowstone_grpc_proto::geyser::SubscribeUpdate; +use crate::Message::GeyserSubscribeUpdate; pub trait FromYellowstoneExtractor { // Target is something like ProducedBlock diff --git a/src/lib.rs b/src/lib.rs index 5de9710..a31d646 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,10 +2,7 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::time::Duration; -use yellowstone_grpc_proto::geyser::{ - CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, -}; +use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; pub mod channel_plugger; @@ -15,6 +12,17 @@ pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; mod obfuscate; +type Attempt = u32; + +// wraps payload and status messages +// clone is required by broacast channel +#[derive(Clone)] +pub enum Message { + GeyserSubscribeUpdate(Box), + // connect (attempt=1) or reconnect(attempt=2..) + Connecting(Attempt), +} + #[derive(Clone, Debug)] pub struct GrpcConnectionTimeouts { pub connect_timeout: Duration, @@ -24,8 +32,8 @@ pub struct GrpcConnectionTimeouts { #[derive(Clone)] pub struct GrpcSourceConfig { - grpc_addr: String, - grpc_x_token: Option, + pub grpc_addr: String, + pub grpc_x_token: Option, tls_config: Option, timeouts: Option, }