diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index d3c3326..47861be 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -22,7 +22,7 @@ use solana_sdk::transaction::TransactionError; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ - create_geyser_reconnecting_stream, GeyserFilter, + create_geyser_reconnecting_stream, }; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, @@ -30,7 +30,7 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{ use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; -use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig}; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; fn start_example_block_consumer( multiplex_stream: impl Stream + Send + 'static, diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index ecd59bb..2655598 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -6,7 +6,7 @@ use std::env; use std::pin::pin; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ - create_geyser_reconnecting_stream, GeyserFilter, + create_geyser_reconnecting_stream, }; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, @@ -16,8 +16,8 @@ use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; -use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_reconnecting_task, Message}; -use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, Message}; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, @@ -96,13 +96,13 @@ pub async fn main() { info!("Write Block stream.."); - let (jh_geyser_task, mut green_stream) = create_geyser_reconnecting_task( + let (jh_geyser_task, mut green_stream) = create_geyser_autoconnection_task( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), ); tokio::spawn(async move { - while let Ok(message) = green_stream.recv().await { + while let Some(message) = green_stream.recv().await { match message { Message::GeyserSubscribeUpdate(subscriber_update) => { // info!("got update: {:?}", subscriber_update.update_oneof.); diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index 687184d..1200647 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -44,74 +44,6 @@ enum ConnectionState>> { WaitReconnect(Attempt), } -#[derive(Clone)] -pub struct GeyserFilter(pub CommitmentConfig); - -impl GeyserFilter { - pub fn blocks_and_txs(&self) -> SubscribeRequest { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - SubscribeRequest { - slots: HashMap::new(), - accounts: Default::default(), - transactions: HashMap::new(), - entry: Default::default(), - blocks: blocks_subs, - blocks_meta: HashMap::new(), - commitment: Some(map_commitment_level(self.0) as i32), - accounts_data_slice: Default::default(), - ping: None, - } - } - - pub fn blocks_meta(&self) -> SubscribeRequest { - let mut blocksmeta_subs = HashMap::new(); - blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); - - SubscribeRequest { - slots: HashMap::new(), - accounts: Default::default(), - transactions: HashMap::new(), - entry: Default::default(), - blocks: HashMap::new(), - blocks_meta: blocksmeta_subs, - commitment: Some(map_commitment_level(self.0) as i32), - accounts_data_slice: Default::default(), - ping: None, - } - } -} - -fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel { - // solana_sdk -> yellowstone - match commitment_config.commitment { - solana_sdk::commitment_config::CommitmentLevel::Processed => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Processed - } - solana_sdk::commitment_config::CommitmentLevel::Confirmed => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed - } - solana_sdk::commitment_config::CommitmentLevel::Finalized => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized - } - _ => { - panic!( - "unsupported commitment level {}", - commitment_config.commitment - ) - } - } -} - // Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate // note: stream never terminates pub fn create_geyser_reconnecting_stream( diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index c9c955f..3fdfc93 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,4 +1,4 @@ -use crate::grpc_subscription_autoreconnect_tasks::TheState::*; +use crate::grpc_subscription_autoreconnect_tasks::State::*; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use solana_sdk::commitment_config::CommitmentConfig; @@ -38,6 +38,11 @@ pub enum Message { Connecting(Attempt), } +#[derive(Debug, Clone)] +pub enum AutoconnectionError { + AbortedFatalError, +} + enum ConnectionState>> { NotConnected(Attempt), Connecting(Attempt, JoinHandle>), @@ -45,75 +50,7 @@ enum ConnectionState>> { WaitReconnect(Attempt), } -#[derive(Clone)] -pub struct GeyserFilter(pub CommitmentConfig); - -impl GeyserFilter { - pub fn blocks_and_txs(&self) -> SubscribeRequest { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - SubscribeRequest { - slots: HashMap::new(), - accounts: Default::default(), - transactions: HashMap::new(), - entry: Default::default(), - blocks: blocks_subs, - blocks_meta: HashMap::new(), - commitment: Some(map_commitment_level(self.0) as i32), - accounts_data_slice: Default::default(), - ping: None, - } - } - - pub fn blocks_meta(&self) -> SubscribeRequest { - let mut blocksmeta_subs = HashMap::new(); - blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); - - SubscribeRequest { - slots: HashMap::new(), - accounts: Default::default(), - transactions: HashMap::new(), - entry: Default::default(), - blocks: HashMap::new(), - blocks_meta: blocksmeta_subs, - commitment: Some(map_commitment_level(self.0) as i32), - accounts_data_slice: Default::default(), - ping: None, - } - } -} - -fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel { - // solana_sdk -> yellowstone - match commitment_config.commitment { - solana_sdk::commitment_config::CommitmentLevel::Processed => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Processed - } - solana_sdk::commitment_config::CommitmentLevel::Confirmed => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed - } - solana_sdk::commitment_config::CommitmentLevel::Finalized => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized - } - _ => { - panic!( - "unsupported commitment level {}", - commitment_config.commitment - ) - } - } -} - -enum TheState>, F: Interceptor> { +enum State>, F: Interceptor> { NotConnected(Attempt), Connected(Attempt, GeyserGrpcClient), Ready(Attempt, S), @@ -125,10 +62,10 @@ enum TheState>, F: Interceptor> } /// return handler will exit on fatal error -pub fn create_geyser_reconnecting_task( +pub fn create_geyser_autoconnection_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, -) -> (JoinHandle<()>, Receiver) { +) -> (JoinHandle>, Receiver) { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ let (sender, receiver_stream) = tokio::sync::mpsc::channel::(1); @@ -249,16 +186,16 @@ pub fn create_geyser_reconnecting_task( FatalError(_) => { // TOOD what to do error!("! fatal error grpc connection - aborting"); - bail!("! fatal error grpc connection - aborting"); + return Err(AutoconnectionError::AbortedFatalError); } - TheState::WaitReconnect(attempt) => { + State::WaitReconnect(attempt) => { let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); info!( "! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source ); sleep(Duration::from_secs_f32(backoff_secs)).await; - TheState::NotConnected(attempt) + State::NotConnected(attempt) } Ready(attempt, mut geyser_stream) => { 'recv_loop: loop { @@ -274,27 +211,27 @@ pub fn create_geyser_reconnecting_task( match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await { Ok(()) => { messages_forwared += 1; - trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32()); + trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0); continue 'recv_loop; } - Err(SendTimeoutError::Timeout(_)) => { + Err(SendTimeoutError::Timeout(the_message)) => { warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis()); - match sender.send(Message::GeyserSubscribeUpdate(Box::new(update_message))).await { + match sender.send(the_message).await { Ok(()) => { messages_forwared += 1; - trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32()); + trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0); } Err(_send_error ) => { warn!("downstream receiver closed, message is lost - aborting"); - break 'recv_loop TheState::FatalError(attempt); + break 'recv_loop State::FatalError(attempt); } } } Err(SendTimeoutError::Closed(_)) => { warn!("downstream receiver closed - aborting"); - break 'recv_loop TheState::FatalError(attempt); + break 'recv_loop State::FatalError(attempt); } } // { @@ -316,11 +253,11 @@ pub fn create_geyser_reconnecting_task( Some(Err(tonic_status)) => { // all tonic errors are recoverable warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status); - break 'recv_loop TheState::WaitReconnect(attempt); + break 'recv_loop State::WaitReconnect(attempt); } None => { warn!("geyser stream closed on {} - retrying", grpc_source); - break 'recv_loop TheState::WaitReconnect(attempt); + break 'recv_loop State::WaitReconnect(attempt); } } } // -- end loop diff --git a/src/lib.rs b/src/lib.rs index 41516f3..dee574b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,8 @@ +use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::time::Duration; +use solana_sdk::commitment_config::CommitmentConfig; +use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta}; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; pub mod grpc_subscription; @@ -64,3 +67,72 @@ impl GrpcSourceConfig { } } } + + +#[derive(Clone)] +pub struct GeyserFilter(pub CommitmentConfig); + +impl GeyserFilter { + pub fn blocks_and_txs(&self) -> SubscribeRequest { + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); + + SubscribeRequest { + slots: HashMap::new(), + accounts: Default::default(), + transactions: HashMap::new(), + entry: Default::default(), + blocks: blocks_subs, + blocks_meta: HashMap::new(), + commitment: Some(map_commitment_level(self.0) as i32), + accounts_data_slice: Default::default(), + ping: None, + } + } + + pub fn blocks_meta(&self) -> SubscribeRequest { + let mut blocksmeta_subs = HashMap::new(); + blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); + + SubscribeRequest { + slots: HashMap::new(), + accounts: Default::default(), + transactions: HashMap::new(), + entry: Default::default(), + blocks: HashMap::new(), + blocks_meta: blocksmeta_subs, + commitment: Some(map_commitment_level(self.0) as i32), + accounts_data_slice: Default::default(), + ping: None, + } + } +} + +fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel { + // solana_sdk -> yellowstone + match commitment_config.commitment { + solana_sdk::commitment_config::CommitmentLevel::Processed => { + yellowstone_grpc_proto::prelude::CommitmentLevel::Processed + } + solana_sdk::commitment_config::CommitmentLevel::Confirmed => { + yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed + } + solana_sdk::commitment_config::CommitmentLevel::Finalized => { + yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized + } + _ => { + panic!( + "unsupported commitment level {}", + commitment_config.commitment + ) + } + } +}