move Message type

This commit is contained in:
GroovieGermanikus 2024-01-22 18:26:26 +01:00
parent b3808da95e
commit 1d0b7890c3
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 22 additions and 34 deletions

View File

@ -10,12 +10,12 @@ use geyser_grpc_connector::channel_plugger::{
}; };
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{
create_geyser_autoconnection_task, Message, create_geyser_autoconnection_task,
}; };
use geyser_grpc_connector::grpcmultiplex_fastestwins::{ use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor, create_multiplexed_stream, FromYellowstoneExtractor,
}; };
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use tracing::warn; use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;

View File

@ -1,4 +1,4 @@
use crate::GrpcSourceConfig; use crate::{Attempt, GrpcSourceConfig, Message};
use async_stream::stream; use async_stream::stream;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
@ -26,17 +26,6 @@ use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::{Code, Status}; use yellowstone_grpc_proto::tonic::{Code, Status};
type Attempt = u32;
// wraps payload and status messages
// clone is required by broacast channel
#[derive(Clone)]
pub enum Message {
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
}
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> { enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt), NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>), Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),

View File

@ -1,4 +1,4 @@
use crate::GrpcSourceConfig; use crate::{GrpcSourceConfig, Message};
use anyhow::bail; use anyhow::bail;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level}; use log::{debug, error, info, log, trace, warn, Level};
@ -28,15 +28,6 @@ use yellowstone_grpc_proto::tonic::{Code, Status};
type Attempt = u32; type Attempt = u32;
// wraps payload and status messages
// clone is required by broacast channel
#[derive(Clone)]
pub enum Message {
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
}
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> { enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt), NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>), Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
@ -76,7 +67,7 @@ pub fn create_geyser_autoconnection_task(
subscribe_filter: SubscribeRequest, subscribe_filter: SubscribeRequest,
) -> (AbortHandle, Receiver<Message>) { ) -> (AbortHandle, Receiver<Message>) {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
let (sender, receiver_stream) = tokio::sync::mpsc::channel::<Message>(1); let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
let jh_geyser_task = tokio::spawn(async move { let jh_geyser_task = tokio::spawn(async move {
let mut state = State::NotConnected(0); let mut state = State::NotConnected(0);
@ -312,7 +303,7 @@ pub fn create_geyser_autoconnection_task(
} }
}); });
(jh_geyser_task.abort_handle(), receiver_stream) (jh_geyser_task.abort_handle(), receiver_channel)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,11 +1,11 @@
use crate::grpc_subscription_autoreconnect_streams::Message; use crate::Message;
use crate::grpc_subscription_autoreconnect_streams::Message::GeyserSubscribeUpdate;
use async_stream::stream; use async_stream::stream;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use log::{info, warn}; use log::{info, warn};
use merge_streams::MergeStreams; use merge_streams::MergeStreams;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use crate::Message::GeyserSubscribeUpdate;
pub trait FromYellowstoneExtractor { pub trait FromYellowstoneExtractor {
// Target is something like ProducedBlock // Target is something like ProducedBlock

View File

@ -2,10 +2,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::{Debug, Display}; use std::fmt::{Debug, Display};
use std::time::Duration; use std::time::Duration;
use yellowstone_grpc_proto::geyser::{ use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate};
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta,
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
pub mod channel_plugger; pub mod channel_plugger;
@ -15,6 +12,17 @@ pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins; pub mod grpcmultiplex_fastestwins;
mod obfuscate; mod obfuscate;
type Attempt = u32;
// wraps payload and status messages
// clone is required by broacast channel
#[derive(Clone)]
pub enum Message {
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct GrpcConnectionTimeouts { pub struct GrpcConnectionTimeouts {
pub connect_timeout: Duration, pub connect_timeout: Duration,
@ -24,8 +32,8 @@ pub struct GrpcConnectionTimeouts {
#[derive(Clone)] #[derive(Clone)]
pub struct GrpcSourceConfig { pub struct GrpcSourceConfig {
grpc_addr: String, pub grpc_addr: String,
grpc_x_token: Option<String>, pub grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>, tls_config: Option<ClientTlsConfig>,
timeouts: Option<GrpcConnectionTimeouts>, timeouts: Option<GrpcConnectionTimeouts>,
} }