From 32709aff8a49ab6ec1d5e3b5d0e4253f59a5c343 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 15 Dec 2023 10:20:41 +0100 Subject: [PATCH] move mock code --- .../workflows/{test.yml => build-check.yml} | 0 examples/stream_blocks_mainnet.rs | 4 +- .../experimental/mock_literpc_core.rs | 0 src/experimental/mod.rs | 3 +- src/grpc_subscription_autoreconnect.rs | 180 ++++++++++++++++++ 5 files changed, 183 insertions(+), 4 deletions(-) rename .github/workflows/{test.yml => build-check.yml} (100%) rename examples/literpc_core_model.rs => src/experimental/mock_literpc_core.rs (100%) create mode 100644 src/grpc_subscription_autoreconnect.rs diff --git a/.github/workflows/test.yml b/.github/workflows/build-check.yml similarity index 100% rename from .github/workflows/test.yml rename to .github/workflows/build-check.yml diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index 709baaa..75a7032 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -1,5 +1,3 @@ -mod literpc_core_model; - use std::collections::HashMap; use std::pin::pin; use futures::{Stream, StreamExt}; @@ -12,7 +10,7 @@ use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBloc use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use geyser_grpc_connector::grpc_subscription_autoreconnect::GrpcSourceConfig; use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper}; -use crate::literpc_core_model::{map_produced_block, ProducedBlock}; +use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock}; fn start_example_consumer(mut block_stream: impl Stream + Send + 'static) { tokio::spawn(async move { diff --git a/examples/literpc_core_model.rs b/src/experimental/mock_literpc_core.rs similarity index 100% rename from examples/literpc_core_model.rs rename to src/experimental/mock_literpc_core.rs diff --git a/src/experimental/mod.rs b/src/experimental/mod.rs index dc44ede..4a28a40 100644 --- a/src/experimental/mod.rs +++ b/src/experimental/mod.rs @@ -1 +1,2 @@ -pub mod grpcmultiplex_fastestwins_channels; \ No newline at end of file +pub mod grpcmultiplex_fastestwins_channels; +pub mod mock_literpc_core; \ No newline at end of file diff --git a/src/grpc_subscription_autoreconnect.rs b/src/grpc_subscription_autoreconnect.rs new file mode 100644 index 0000000..4c26898 --- /dev/null +++ b/src/grpc_subscription_autoreconnect.rs @@ -0,0 +1,180 @@ +use async_stream::stream; +use futures::{Stream, StreamExt}; +use itertools::Itertools; +use log::{debug, info, warn}; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::collections::HashMap; +use std::pin::{pin, Pin}; +use tokio::task::JoinHandle; +use tokio::time::{sleep, Duration}; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult}; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::SubscribeUpdateBlockMeta; +use yellowstone_grpc_proto::geyser::{ + CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdate, +}; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; +use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; +use yellowstone_grpc_proto::tonic::{async_trait, Status}; + + +#[async_trait] +trait GrpcConnectionFactory: Clone { + // async fn connect() -> GeyserGrpcClientResult>+Sized>; + async fn connect_and_subscribe(&self) -> GeyserGrpcClientResult>>>>; +} + +#[derive(Clone, Debug)] +pub struct GrpcSourceConfig { + // symbolic name used in logs + pub label: String, + grpc_addr: String, + grpc_x_token: Option, + tls_config: Option, +} + +impl GrpcSourceConfig { + pub fn new(label: String, grpc_addr: String, grpc_x_token: Option) -> Self { + Self { + label, + grpc_addr, + grpc_x_token, + tls_config: None, + } + } +} + +enum ConnectionState>> { + NotConnected, + Connecting(JoinHandle>), + Ready(S), + WaitReconnect, +} + +// Takes geyser filter for geyser, connect to Geyser and return a generic stream of SubscribeUpdate +// note: stream never terminates +pub fn create_geyser_reconnecting_stream( + grpc_source: GrpcSourceConfig, + commitment_config: CommitmentConfig, + // TODO do we want Option +) -> impl Stream> { + let label = grpc_source.label.clone(); + + // solana_sdk -> yellowstone + let commitment_level = match commitment_config.commitment { + solana_sdk::commitment_config::CommitmentLevel::Confirmed => yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed, + solana_sdk::commitment_config::CommitmentLevel::Finalized => yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized, + _ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"), + }; + + // NOT_CONNECTED; CONNECTING + let mut state = ConnectionState::NotConnected; + + // in case of cancellation, we restart from here: + // thus we want to keep the progression in a state object outside the stream! makro + stream! { + loop{ + let yield_value; + (state, yield_value) = match state { + ConnectionState::NotConnected => { + + let connection_task = tokio::spawn({ + let addr = grpc_source.grpc_addr.clone(); + let token = grpc_source.grpc_x_token.clone(); + let config = grpc_source.tls_config.clone(); + // let (block_filter, blockmeta_filter) = blocks_filters.clone(); + async move { + + let connect_result = GeyserGrpcClient::connect_with_timeout( + addr, token, config, + Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await; + let mut client = connect_result?; + + // TODO make filter configurable for caller + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); + + let mut blocksmeta_subs = HashMap::new(); + blocksmeta_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocksMeta {}, + ); + + // Connected; + let subscribe_result = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + blocksmeta_subs, + Some(commitment_level), + Default::default(), + None, + ).await; + + subscribe_result + } + }); + + (ConnectionState::Connecting(connection_task), None) + } + ConnectionState::Connecting(connection_task) => { + let subscribe_result = connection_task.await; + + match subscribe_result { + Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), None), + Ok(Err(geyser_error)) => { + // TODO identify non-recoverable errors and cancel stream + warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error); + (ConnectionState::WaitReconnect, None) + }, + Err(geyser_grpc_task_error) => { + panic!("Task aborted - should not happen :{geyser_grpc_task_error}"); + } + } + + } + ConnectionState::Ready(mut geyser_stream) => { + + //for await update_message in geyser_stream { + match geyser_stream.next().await { + Some(Ok(update_message)) => { + info!(">message on {}", label); + (ConnectionState::Ready(geyser_stream), Some(update_message)) + } + Some(Err(tonic_status)) => { + // TODO identify non-recoverable errors and cancel stream + warn!("Receive error on {} - retrying: {:?}", label, tonic_status); + (ConnectionState::WaitReconnect, None) + } + None => { + //TODO should not arrive. Mean the stream close. + warn!("Geyzer stream close on {} - retrying", label); + (ConnectionState::WaitReconnect, None) + } + } + //} // -- production loop + + } + ConnectionState::WaitReconnect => { + // TODO implement backoff + sleep(Duration::from_secs(1)).await; + (ConnectionState::NotConnected, None) + } + }; // -- match + yield yield_value + } + + } // -- stream! +}