From afefb1847bed10044faa5413dbc10423913ba379 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 19 Jan 2024 11:34:45 +0100 Subject: [PATCH] cleanup --- Cargo.lock | 1 + Cargo.toml | 1 + examples/stream_blocks_autoconnect.rs | 17 ++- examples/stream_blocks_mainnet.rs | 6 +- examples/stream_blocks_single.rs | 9 +- ...grpc_subscription_autoreconnect_streams.rs | 4 +- src/grpc_subscription_autoreconnect_tasks.rs | 116 ++++++++++-------- src/lib.rs | 10 +- 8 files changed, 92 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 898c6f9..c1705cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1250,6 +1250,7 @@ dependencies = [ "itertools 0.10.5", "log", "merge-streams", + "solana-logger", "solana-sdk", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 9f88a6f..ec0e897 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,4 @@ bincode = "1.3.3" [dev-dependencies] tracing-subscriber = "0.3.16" +solana-logger = "1" diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 0b019b6..6b8d2a0 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -5,19 +5,19 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ - create_geyser_reconnecting_stream, +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ + create_geyser_autoconnection_task, Message, }; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; use tokio::time::{sleep, Duration}; use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; -use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, Message}; -use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, @@ -70,6 +70,7 @@ impl FromYellowstoneExtractor for BlockMiniExtractor { } } +#[warn(dead_code)] enum TestCases { Basic, SlowReceiverStartup, @@ -79,14 +80,12 @@ enum TestCases { } const TEST_CASE: TestCases = TestCases::TemporaryLaggingReceiver; - #[tokio::main] pub async fn main() { // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace tracing_subscriber::fmt::init(); // console_subscriber::init(); - let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); @@ -107,19 +106,18 @@ pub async fn main() { info!("Write Block stream.."); - let (jh_geyser_task, mut green_stream) = create_geyser_autoconnection_task( + let (jh_geyser_task, mut message_channel) = create_geyser_autoconnection_task( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), ); tokio::spawn(async move { - if let TestCases::SlowReceiverStartup = TEST_CASE { sleep(Duration::from_secs(5)).await; } let mut message_count = 0; - while let Some(message) = green_stream.recv().await { + while let Some(message) = message_channel.recv().await { if let TestCases::AbortTaskFromOutside = TEST_CASE { if message_count > 5 { info!("(testcase) aborting task from outside"); @@ -148,7 +146,6 @@ pub async fn main() { sleep(Duration::from_millis(1500)).await; } } - } warn!("Stream aborted"); }); diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index 47861be..a8e0836 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -21,16 +21,14 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::TransactionError; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; -use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ - create_geyser_reconnecting_stream, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; -use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; fn start_example_block_consumer( multiplex_stream: impl Stream + Send + 'static, diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index e490c7c..f5db555 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -5,16 +5,18 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{create_geyser_reconnecting_stream, Message}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ + create_geyser_reconnecting_stream, Message, +}; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; use tokio::time::{sleep, Duration}; use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; -use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, @@ -93,12 +95,11 @@ pub async fn main() { info!("Write Block stream.."); - let green_stream= create_geyser_reconnecting_stream( + let green_stream = create_geyser_reconnecting_stream( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), ); - tokio::spawn(async move { let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index 1200647..544389a 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -1,3 +1,4 @@ +use crate::GrpcSourceConfig; use async_stream::stream; use futures::channel::mpsc; use futures::{Stream, StreamExt}; @@ -24,7 +25,6 @@ use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use yellowstone_grpc_proto::tonic::{Code, Status}; -use crate::GrpcSourceConfig; type Attempt = u32; @@ -155,8 +155,8 @@ pub fn create_geyser_reconnecting_stream( #[cfg(test)] mod tests { - use crate::GrpcConnectionTimeouts; use super::*; + use crate::GrpcConnectionTimeouts; #[tokio::test] async fn test_debug_no_secrets() { diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index d85db6e..b21411e 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,3 +1,5 @@ +use crate::GrpcSourceConfig; +use anyhow::bail; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use solana_sdk::commitment_config::CommitmentConfig; @@ -6,12 +8,11 @@ use std::fmt::{Debug, Display}; use std::future::Future; use std::pin::Pin; use std::time::Duration; -use anyhow::bail; use tokio::sync::mpsc::error::{SendError, SendTimeoutError}; use tokio::sync::mpsc::Receiver; -use tokio::task::JoinHandle; +use tokio::task::{AbortHandle, JoinHandle}; use tokio::time::error::Elapsed; -use tokio::time::{Instant, sleep, timeout, Timeout}; +use tokio::time::{sleep, timeout, Instant, Timeout}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::{ @@ -24,7 +25,6 @@ use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use yellowstone_grpc_proto::tonic::{Code, Status}; -use crate::GrpcSourceConfig; type Attempt = u32; @@ -37,11 +37,6 @@ pub enum Message { Connecting(Attempt), } -#[derive(Debug, Clone)] -pub enum AutoconnectionError { - AbortedFatalError, -} - enum ConnectionState>> { NotConnected(Attempt), Connecting(Attempt, JoinHandle>), @@ -79,7 +74,7 @@ enum State>, F: Interceptor> { pub fn create_geyser_autoconnection_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, -) -> (JoinHandle>, Receiver) { +) -> (AbortHandle, Receiver) { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ let (sender, receiver_stream) = tokio::sync::mpsc::channel::(1); @@ -119,9 +114,15 @@ pub fn create_geyser_autoconnection_task( match connect_result { Ok(client) => State::Connected(attempt, client), - Err(GeyserGrpcClientError::InvalidUri(_)) => State::FatalError(attempt, FatalErrorReason::ConfigurationError), - Err(GeyserGrpcClientError::MetadataValueError(_)) => State::FatalError(attempt, FatalErrorReason::ConfigurationError), - Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => State::FatalError(attempt, FatalErrorReason::ConfigurationError), + Err(GeyserGrpcClientError::InvalidUri(_)) => { + State::FatalError(attempt, FatalErrorReason::ConfigurationError) + } + Err(GeyserGrpcClientError::MetadataValueError(_)) => { + State::FatalError(attempt, FatalErrorReason::ConfigurationError) + } + Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { + State::FatalError(attempt, FatalErrorReason::ConfigurationError) + } Err(GeyserGrpcClientError::TonicError(tonic_error)) => { warn!( "! connect failed on {} - aborting: {:?}", @@ -197,30 +198,28 @@ pub fn create_geyser_autoconnection_task( sleep(Duration::from_secs_f32(backoff_secs)).await; State::NotConnected(attempt) } - State::FatalError(_attempt, reason) => { - match reason { - FatalErrorReason::DownstreamChannelClosed => { - warn!("! downstream closed - aborting"); - return Err(AutoconnectionError::AbortedFatalError); - } - FatalErrorReason::ConfigurationError => { - warn!("! fatal configuration error - aborting"); - return Err(AutoconnectionError::AbortedFatalError); - } - FatalErrorReason::NetworkError => { - warn!("! fatal network error - aborting"); - return Err(AutoconnectionError::AbortedFatalError); - } - FatalErrorReason::SubscribeError => { - warn!("! fatal grpc subscribe error - aborting"); - return Err(AutoconnectionError::AbortedFatalError); - } - FatalErrorReason::Misc => { - error!("! fatal misc error grpc connection - aborting"); - return Err(AutoconnectionError::AbortedFatalError); - } + State::FatalError(_attempt, reason) => match reason { + FatalErrorReason::DownstreamChannelClosed => { + warn!("! downstream closed - aborting"); + return; } - } + FatalErrorReason::ConfigurationError => { + warn!("! fatal configuration error - aborting"); + return; + } + FatalErrorReason::NetworkError => { + warn!("! fatal network error - aborting"); + return; + } + FatalErrorReason::SubscribeError => { + warn!("! fatal grpc subscribe error - aborting"); + return; + } + FatalErrorReason::Misc => { + error!("! fatal misc error grpc connection - aborting"); + return; + } + }, State::WaitReconnect(attempt) => { let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); info!( @@ -240,17 +239,30 @@ pub fn create_geyser_autoconnection_task( // TODO extract timeout param; TODO respect startup // emit warning if message not received // note: first send never blocks - let warning_threshold = if messages_forwarded == 1 { Duration::from_millis(3000) } else { Duration::from_millis(500) }; + let warning_threshold = if messages_forwarded == 1 { + Duration::from_millis(3000) + } else { + Duration::from_millis(500) + }; let started_at = Instant::now(); - match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await { + match sender + .send_timeout( + Message::GeyserSubscribeUpdate(Box::new(update_message)), + warning_threshold, + ) + .await + { Ok(()) => { messages_forwarded += 1; if messages_forwarded == 1 { // note: first send never blocks - do not print time as this is a lie trace!("queued first update message"); } else { - trace!("queued update message {} in {:.02}ms", - messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0); + trace!( + "queued update message {} in {:.02}ms", + messages_forwarded, + started_at.elapsed().as_secs_f32() * 1000.0 + ); } continue 'recv_loop; } @@ -260,19 +272,27 @@ pub fn create_geyser_autoconnection_task( match sender.send(the_message).await { Ok(()) => { messages_forwarded += 1; - trace!("queued delayed update message {} in {:.02}ms", - messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0); + trace!( + "queued delayed update message {} in {:.02}ms", + messages_forwarded, + started_at.elapsed().as_secs_f32() * 1000.0 + ); } - Err(_send_error ) => { + Err(_send_error) => { warn!("downstream receiver closed, message is lost - aborting"); - break 'recv_loop State::FatalError(attempt, FatalErrorReason::DownstreamChannelClosed); + break 'recv_loop State::FatalError( + attempt, + FatalErrorReason::DownstreamChannelClosed, + ); } } - } Err(SendTimeoutError::Closed(_)) => { warn!("downstream receiver closed - aborting"); - break 'recv_loop State::FatalError(attempt, FatalErrorReason::DownstreamChannelClosed); + break 'recv_loop State::FatalError( + attempt, + FatalErrorReason::DownstreamChannelClosed, + ); } } // { @@ -307,13 +327,13 @@ pub fn create_geyser_autoconnection_task( } }); - (jh_geyser_task, receiver_stream) + (jh_geyser_task.abort_handle(), receiver_stream) } #[cfg(test)] mod tests { - use crate::GrpcConnectionTimeouts; use super::*; + use crate::GrpcConnectionTimeouts; #[tokio::test] async fn test_debug_no_secrets() { diff --git a/src/lib.rs b/src/lib.rs index dee574b..358d941 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,20 @@ +use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::time::Duration; -use solana_sdk::commitment_config::CommitmentConfig; -use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta}; +use yellowstone_grpc_proto::geyser::{ + CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, +}; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; +mod channel_plugger; pub mod grpc_subscription; pub mod grpc_subscription_autoreconnect_streams; pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; mod obfuscate; - #[derive(Clone, Debug)] pub struct GrpcConnectionTimeouts { pub connect_timeout: Duration, @@ -68,7 +71,6 @@ impl GrpcSourceConfig { } } - #[derive(Clone)] pub struct GeyserFilter(pub CommitmentConfig);