From 48efa7bbe84b2f127245ea9054caa5232403cdda Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 15:27:32 +0100 Subject: [PATCH 1/7] restart attempt counter in ready state --- ...net.rs => stream_blocks_mainnet_stream.rs} | 0 ...grpc_subscription_autoreconnect_streams.rs | 23 +++--- src/grpc_subscription_autoreconnect_tasks.rs | 73 +++++++------------ 3 files changed, 39 insertions(+), 57 deletions(-) rename examples/{stream_blocks_mainnet.rs => stream_blocks_mainnet_stream.rs} (100%) diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet_stream.rs similarity index 100% rename from examples/stream_blocks_mainnet.rs rename to examples/stream_blocks_mainnet_stream.rs diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index e070292..cb1b26d 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -12,7 +12,7 @@ use yellowstone_grpc_proto::tonic::Status; enum ConnectionState>> { NotConnected(Attempt), Connecting(Attempt, JoinHandle>), - Ready(Attempt, S), + Ready(S), WaitReconnect(Attempt), } @@ -32,8 +32,7 @@ pub fn create_geyser_reconnecting_stream( (state, yield_value) = match state { - ConnectionState::NotConnected(mut attempt) => { - attempt += 1; + ConnectionState::NotConnected(attempt) => { let connection_task = tokio::spawn({ let addr = grpc_source.grpc_addr.clone(); @@ -43,7 +42,7 @@ pub fn create_geyser_reconnecting_stream( let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter = subscribe_filter.clone(); - log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr); + log!(if attempt >= 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt + 1, addr); async move { let connect_result = GeyserGrpcClient::connect_with_timeout( @@ -67,18 +66,18 @@ pub fn create_geyser_reconnecting_stream( } }); - (ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt)) + (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1)) } ConnectionState::Connecting(attempt, connection_task) => { let subscribe_result = connection_task.await; match subscribe_result { - Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)), + Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), Message::Connecting(attempt)), Ok(Err(geyser_error)) => { // ATM we consider all errors recoverable warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt)) }, Err(geyser_grpc_task_error) => { panic!("task aborted - should not happen :{geyser_grpc_task_error}"); @@ -87,27 +86,27 @@ pub fn create_geyser_reconnecting_stream( } - ConnectionState::Ready(attempt, mut geyser_stream) => { + ConnectionState::Ready(mut geyser_stream) => { let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await { Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); - (ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message))) + (ConnectionState::Ready(geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message))) } Ok(Some(Err(tonic_status))) => { // ATM we consider all errors recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(0), Message::Connecting(0)) } Ok(None) => { // should not arrive here, Mean the stream close. warn!("geyser stream closed on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(0), Message::Connecting(0)) } Err(_elapsed) => { // timeout warn!("geyser stream timeout on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(0), Message::Connecting(0)) } } diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index bf786e5..a519935 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -15,8 +15,9 @@ type Attempt = u32; enum ConnectionState>, F: Interceptor> { NotConnected(Attempt), - Connected(Attempt, GeyserGrpcClient), - Ready(Attempt, S), + // connected but not subscribed + Connecting(Attempt, GeyserGrpcClient), + Ready(S), // error states RecoverableConnectionError(Attempt), // non-recoverable error @@ -59,8 +60,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( loop { state = match state { - ConnectionState::NotConnected(mut attempt) => { - attempt += 1; + ConnectionState::NotConnected(attempt) => { let addr = grpc_source.grpc_addr.clone(); let token = grpc_source.grpc_x_token.clone(); @@ -68,13 +68,13 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); log!( - if attempt > 1 { + if attempt > 0 { Level::Warn } else { Level::Debug }, - "Connecting attempt #{} to {}", - attempt, + "Connecting attempt {} to {}", + attempt + 1, addr ); let connect_result = GeyserGrpcClient::connect_with_timeout( @@ -88,47 +88,39 @@ pub fn create_geyser_autoconnection_task_with_mpsc( .await; match connect_result { - Ok(client) => ConnectionState::Connected(attempt, client), + Ok(client) => ConnectionState::Connecting(attempt + 1, client), Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError( - attempt, - FatalErrorReason::ConfigurationError, - ), - Err(GeyserGrpcClientError::MetadataValueError(_)) => { - ConnectionState::FatalError( - attempt, - FatalErrorReason::ConfigurationError, - ) + attempt + 1, FatalErrorReason::ConfigurationError, + ), Err(GeyserGrpcClientError::MetadataValueError(_)) => { + ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) } Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { - ConnectionState::FatalError( - attempt, - FatalErrorReason::ConfigurationError, - ) + ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) } Err(GeyserGrpcClientError::TonicError(tonic_error)) => { warn!( "connect failed on {} - aborting: {:?}", grpc_source, tonic_error ); - ConnectionState::FatalError(attempt, FatalErrorReason::NetworkError) + ConnectionState::FatalError(attempt + 1, FatalErrorReason::NetworkError) } Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => { warn!( "connect failed on {} - retrying: {:?}", grpc_source, tonic_status ); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => { warn!( "connect failed with send error on {} - retrying: {:?}", grpc_source, send_error ); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } } } - ConnectionState::Connected(attempt, mut client) => { + ConnectionState::Connecting(attempt, mut client) => { let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter = subscribe_filter.clone(); @@ -143,14 +135,14 @@ pub fn create_geyser_autoconnection_task_with_mpsc( match subscribe_result_timeout { Ok(subscribe_result) => { match subscribe_result { - Ok(geyser_stream) => ConnectionState::Ready(attempt, geyser_stream), + Ok(geyser_stream) => ConnectionState::Ready(geyser_stream), Err(GeyserGrpcClientError::TonicError(_)) => { warn!("subscribe failed on {} - retrying", grpc_source); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::TonicStatus(_)) => { warn!("subscribe failed on {} - retrying", grpc_source); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } // non-recoverable Err(unrecoverable_error) => { @@ -158,10 +150,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error ); - ConnectionState::FatalError( - attempt, - FatalErrorReason::SubscribeError, - ) + ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError) } } } @@ -170,7 +159,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe failed with timeout on {} - retrying", grpc_source ); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } } } @@ -210,7 +199,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( sleep(Duration::from_secs_f32(backoff_secs)).await; ConnectionState::NotConnected(attempt) } - ConnectionState::Ready(attempt, mut geyser_stream) => { + ConnectionState::Ready(mut geyser_stream) => { let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); 'recv_loop: loop { match timeout( @@ -263,36 +252,30 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } Err(_send_error) => { warn!("downstream receiver closed, message is lost - aborting"); - break 'recv_loop ConnectionState::FatalError( - attempt, - FatalErrorReason::DownstreamChannelClosed, - ); + break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); } } } Err(SendTimeoutError::Closed(_)) => { warn!("downstream receiver closed - aborting"); - break 'recv_loop ConnectionState::FatalError( - attempt, - FatalErrorReason::DownstreamChannelClosed, - ); + break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); } } } Ok(Some(Err(tonic_status))) => { // all tonic errors are recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - break 'recv_loop ConnectionState::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(0); } Ok(None) => { warn!("geyser stream closed on {} - retrying", grpc_source); - break 'recv_loop ConnectionState::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(0); } Err(_elapsed) => { warn!("timeout on {} - retrying", grpc_source); - break 'recv_loop ConnectionState::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(0); } - } + } // -- END match } // -- END receive loop } } // -- END match From 4ffe39849fea54f5d858918b5329855013524ac0 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 15:36:56 +0100 Subject: [PATCH 2/7] fix double-increment of attempt --- ...grpc_subscription_autoreconnect_streams.rs | 2 +- src/grpc_subscription_autoreconnect_tasks.rs | 46 +++++++++++++------ 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index cb1b26d..b945e90 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -66,7 +66,7 @@ pub fn create_geyser_reconnecting_stream( } }); - (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1)) + (ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt + 1)) } ConnectionState::Connecting(attempt, connection_task) => { diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index a519935..34d3083 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -38,7 +38,8 @@ pub fn create_geyser_autoconnection_task( ) -> (AbortHandle, Receiver) { let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); - let abort_handle = create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); + let abort_handle = + create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); (abort_handle, receiver_channel) } @@ -61,7 +62,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc( loop { state = match state { ConnectionState::NotConnected(attempt) => { - let addr = grpc_source.grpc_addr.clone(); let token = grpc_source.grpc_x_token.clone(); let config = grpc_source.tls_config.clone(); @@ -88,14 +88,22 @@ pub fn create_geyser_autoconnection_task_with_mpsc( .await; match connect_result { - Ok(client) => ConnectionState::Connecting(attempt + 1, client), + Ok(client) => ConnectionState::Connecting(attempt, client), Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError( - attempt + 1, FatalErrorReason::ConfigurationError, - ), Err(GeyserGrpcClientError::MetadataValueError(_)) => { - ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) + attempt + 1, + FatalErrorReason::ConfigurationError, + ), + Err(GeyserGrpcClientError::MetadataValueError(_)) => { + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::ConfigurationError, + ) } Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { - ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::ConfigurationError, + ) } Err(GeyserGrpcClientError::TonicError(tonic_error)) => { warn!( @@ -135,13 +143,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc( match subscribe_result_timeout { Ok(subscribe_result) => { match subscribe_result { - Ok(geyser_stream) => ConnectionState::Ready(geyser_stream), + Ok(geyser_stream) => { + debug!("subscribed to {} after {} failed attempts", grpc_source, attempt); + ConnectionState::Ready(geyser_stream) + }, Err(GeyserGrpcClientError::TonicError(_)) => { - warn!("subscribe failed on {} - retrying", grpc_source); + warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt + 1); ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::TonicStatus(_)) => { - warn!("subscribe failed on {} - retrying", grpc_source); + warn!("subscribe failed on {} {} attempts - retrying", grpc_source, attempt + 1); ConnectionState::RecoverableConnectionError(attempt + 1) } // non-recoverable @@ -150,7 +161,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error ); - ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError) + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::SubscribeError, + ) } } } @@ -252,13 +266,19 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } Err(_send_error) => { warn!("downstream receiver closed, message is lost - aborting"); - break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); + break 'recv_loop ConnectionState::FatalError( + 0, + FatalErrorReason::DownstreamChannelClosed, + ); } } } Err(SendTimeoutError::Closed(_)) => { warn!("downstream receiver closed - aborting"); - break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); + break 'recv_loop ConnectionState::FatalError( + 0, + FatalErrorReason::DownstreamChannelClosed, + ); } } } From 1fc40f0cd39e6054ab466d7fd7c1a928eeea4a5e Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 15:41:47 +0100 Subject: [PATCH 3/7] task example --- examples/stream_blocks_mainnet_task.rs | 378 +++++++++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 examples/stream_blocks_mainnet_task.rs diff --git a/examples/stream_blocks_mainnet_task.rs b/examples/stream_blocks_mainnet_task.rs new file mode 100644 index 0000000..7a79a38 --- /dev/null +++ b/examples/stream_blocks_mainnet_task.rs @@ -0,0 +1,378 @@ +use futures::{Stream, StreamExt}; +use log::{info, warn}; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::env; +use std::pin::pin; + +use base64::Engine; +use itertools::Itertools; +use solana_sdk::borsh0_10::try_from_slice_unchecked; +/// This file mocks the core model of the RPC server. +use solana_sdk::compute_budget; +use solana_sdk::compute_budget::ComputeBudgetInstruction; +use solana_sdk::hash::Hash; +use solana_sdk::instruction::CompiledInstruction; +use solana_sdk::message::v0::MessageAddressTableLookup; +use solana_sdk::message::{v0, MessageHeader, VersionedMessage}; +use solana_sdk::pubkey::Pubkey; + +use solana_sdk::signature::Signature; +use solana_sdk::transaction::TransactionError; +use tokio::sync::mpsc::Receiver; +use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; + +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ + create_geyser_autoconnection_task, create_geyser_autoconnection_task_with_mpsc, +}; +use geyser_grpc_connector::grpcmultiplex_fastestwins::{ + create_multiplexed_stream, FromYellowstoneExtractor, +}; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; +use tokio::time::{sleep, Duration}; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::SubscribeUpdate; + +fn start_example_block_consumer( + multiplex_stream: impl Stream + Send + 'static, +) { + tokio::spawn(async move { + let mut block_stream = pin!(multiplex_stream); + while let Some(block) = block_stream.next().await { + info!( + "emitted block #{}@{} from multiplexer", + block.slot, block.commitment_config.commitment + ); + } + }); +} + +fn start_example_blockmeta_consumer(mut multiplex_channel: Receiver) { + tokio::spawn(async move { + loop { + match multiplex_channel.recv().await { + Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { + Some(UpdateOneof::BlockMeta(meta)) => { + info!("emitted blockmeta #{} from multiplexer", meta.slot); + } + None => {} + _ => {} + }, + None => { + warn!("multiplexer channel closed - aborting"); + return; + } + Some(Message::Connecting(_)) => {} + } + } + }); +} + +struct BlockExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockExtractor { + type Target = ProducedBlock; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let block = map_produced_block(update_block_message, self.0); + Some((block.slot, block)) + } + _ => None, + } + } +} + +pub struct BlockMetaMini { + pub slot: Slot, + pub commitment_config: CommitmentConfig, +} + +struct BlockMetaExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockMetaExtractor { + type Target = BlockMetaMini; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => { + let slot = update_blockmeta_message.slot; + let mini = BlockMetaMini { + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + _ => None, + } + } +} + +#[tokio::main] +pub async fn main() { + // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace + tracing_subscriber::fmt::init(); + // console_subscriber::init(); + + let subscribe_blocks = true; + let subscribe_blockmeta = false; + + let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); + let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); + let grpc_addr_blue = env::var("GRPC_ADDR2").expect("need grpc url for blue"); + let grpc_x_token_blue = env::var("GRPC_X_TOKEN2").ok(); + // via toxiproxy + let grpc_addr_toxiproxy = "http://127.0.0.1:10001".to_string(); + + info!( + "Using green on {} ({})", + grpc_addr_green, + grpc_x_token_green.is_some() + ); + info!( + "Using blue on {} ({})", + grpc_addr_blue, + grpc_x_token_blue.is_some() + ); + info!("Using toxiproxy on {}", grpc_addr_toxiproxy); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), + }; + + let green_config = + GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + let blue_config = + GrpcSourceConfig::new(grpc_addr_blue, grpc_x_token_blue, None, timeouts.clone()); + let toxiproxy_config = GrpcSourceConfig::new(grpc_addr_toxiproxy, None, None, timeouts.clone()); + + let (autoconnect_tx, mut blockmeta_rx) = tokio::sync::mpsc::channel(10); + info!("Write BlockMeta stream.."); + let green_stream_ah = create_geyser_autoconnection_task_with_mpsc( + green_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + autoconnect_tx.clone(), + ); + let blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( + blue_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + autoconnect_tx.clone(), + ); + let toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( + toxiproxy_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + autoconnect_tx.clone(), + ); + start_example_blockmeta_consumer(blockmeta_rx); + + // "infinite" sleep + sleep(Duration::from_secs(1800)).await; +} + +#[derive(Default, Debug, Clone)] +pub struct ProducedBlock { + pub transactions: Vec, + // pub leader_id: Option, + pub blockhash: String, + pub block_height: u64, + pub slot: Slot, + pub parent_slot: Slot, + pub block_time: u64, + pub commitment_config: CommitmentConfig, + pub previous_blockhash: String, + // pub rewards: Option>, +} + +#[derive(Debug, Clone)] +pub struct TransactionInfo { + pub signature: String, + pub err: Option, + pub cu_requested: Option, + pub prioritization_fees: Option, + pub cu_consumed: Option, + pub recent_blockhash: String, + pub message: String, +} + +pub fn map_produced_block( + block: SubscribeUpdateBlock, + commitment_config: CommitmentConfig, +) -> ProducedBlock { + let txs: Vec = block + .transactions + .into_iter() + .filter_map(|tx| { + let Some(meta) = tx.meta else { + return None; + }; + + let Some(transaction) = tx.transaction else { + return None; + }; + + let Some(message) = transaction.message else { + return None; + }; + + let Some(header) = message.header else { + return None; + }; + + let signatures = transaction + .signatures + .into_iter() + .filter_map(|sig| match Signature::try_from(sig) { + Ok(sig) => Some(sig), + Err(_) => { + log::warn!( + "Failed to read signature from transaction in block {} - skipping", + block.blockhash + ); + None + } + }) + .collect_vec(); + + let err = meta.err.map(|x| { + bincode::deserialize::(&x.err) + .expect("TransactionError should be deserialized") + }); + + let signature = signatures[0]; + let compute_units_consumed = meta.compute_units_consumed; + + let message = VersionedMessage::V0(v0::Message { + header: MessageHeader { + num_required_signatures: header.num_required_signatures as u8, + num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, + num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8, + }, + account_keys: message + .account_keys + .into_iter() + .map(|key| { + let bytes: [u8; 32] = + key.try_into().unwrap_or(Pubkey::default().to_bytes()); + Pubkey::new_from_array(bytes) + }) + .collect(), + recent_blockhash: Hash::new(&message.recent_blockhash), + instructions: message + .instructions + .into_iter() + .map(|ix| CompiledInstruction { + program_id_index: ix.program_id_index as u8, + accounts: ix.accounts, + data: ix.data, + }) + .collect(), + address_table_lookups: message + .address_table_lookups + .into_iter() + .map(|table| { + let bytes: [u8; 32] = table + .account_key + .try_into() + .unwrap_or(Pubkey::default().to_bytes()); + MessageAddressTableLookup { + account_key: Pubkey::new_from_array(bytes), + writable_indexes: table.writable_indexes, + readonly_indexes: table.readonly_indexes, + } + }) + .collect(), + }); + + let legacy_compute_budget: Option<(u32, Option)> = + message.instructions().iter().find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + if additional_fee > 0 { + return Some(( + units, + Some(((units * 1000) / additional_fee) as u64), + )); + } else { + return Some((units, None)); + } + } + } + None + }); + + let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); + let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None); + + let cu_requested = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } + } + None + }) + .or(legacy_cu_requested); + + let prioritization_fees = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(price); + } + } + + None + }) + .or(legacy_prioritization_fees); + + Some(TransactionInfo { + signature: signature.to_string(), + err, + cu_requested, + prioritization_fees, + cu_consumed: compute_units_consumed, + recent_blockhash: message.recent_blockhash().to_string(), + message: base64::engine::general_purpose::STANDARD.encode(message.serialize()), + }) + }) + .collect(); + + // removed rewards + + ProducedBlock { + transactions: txs, + block_height: block + .block_height + .map(|block_height| block_height.block_height) + .unwrap(), + block_time: block.block_time.map(|time| time.timestamp).unwrap() as u64, + blockhash: block.blockhash, + previous_blockhash: block.parent_blockhash, + commitment_config, + // leader_id, + parent_slot: block.parent_slot, + slot: block.slot, + // rewards, + } +} From f1e174c1dc18ce43b9e46c327e9ee644a8edb7b8 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 15:41:55 +0100 Subject: [PATCH 4/7] make attempt 1-based --- src/grpc_subscription_autoreconnect_tasks.rs | 26 ++++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 34d3083..b196a29 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -11,6 +11,7 @@ use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::Status; +// 1-based counter type Attempt = u32; enum ConnectionState>, F: Interceptor> { @@ -56,7 +57,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( // task will be aborted when downstream receiver gets dropped let jh_geyser_task = tokio::spawn(async move { - let mut state = ConnectionState::NotConnected(0); + let mut state = ConnectionState::NotConnected(1); let mut messages_forwarded = 0; loop { @@ -68,13 +69,13 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); log!( - if attempt > 0 { + if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt {} to {}", - attempt + 1, + attempt, addr ); let connect_result = GeyserGrpcClient::connect_with_timeout( @@ -144,15 +145,17 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Ok(subscribe_result) => { match subscribe_result { Ok(geyser_stream) => { - debug!("subscribed to {} after {} failed attempts", grpc_source, attempt); + if attempt > 1 { + debug!("subscribed to {} after {} failed attempts", grpc_source, attempt); + } ConnectionState::Ready(geyser_stream) }, Err(GeyserGrpcClientError::TonicError(_)) => { - warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt + 1); + warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt); ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::TonicStatus(_)) => { - warn!("subscribe failed on {} {} attempts - retrying", grpc_source, attempt + 1); + warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt); ConnectionState::RecoverableConnectionError(attempt + 1) } // non-recoverable @@ -161,10 +164,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error ); - ConnectionState::FatalError( - attempt + 1, - FatalErrorReason::SubscribeError, - ) + ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError) } } } @@ -285,15 +285,15 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Ok(Some(Err(tonic_status))) => { // all tonic errors are recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - break 'recv_loop ConnectionState::WaitReconnect(0); + break 'recv_loop ConnectionState::WaitReconnect(1); } Ok(None) => { warn!("geyser stream closed on {} - retrying", grpc_source); - break 'recv_loop ConnectionState::WaitReconnect(0); + break 'recv_loop ConnectionState::WaitReconnect(1); } Err(_elapsed) => { warn!("timeout on {} - retrying", grpc_source); - break 'recv_loop ConnectionState::WaitReconnect(0); + break 'recv_loop ConnectionState::WaitReconnect(1); } } // -- END match } // -- END receive loop From 26bc7a36834e342eea4147ac18bb2e0a21611791 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 15:45:28 +0100 Subject: [PATCH 5/7] make attempt counter 1-based --- src/grpc_subscription_autoreconnect_streams.rs | 14 +++++++------- src/grpc_subscription_autoreconnect_tasks.rs | 5 +---- src/lib.rs | 1 + 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index b945e90..2098dce 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -22,7 +22,7 @@ pub fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, ) -> impl Stream { - let mut state = ConnectionState::NotConnected(0); + let mut state = ConnectionState::NotConnected(1); // in case of cancellation, we restart from here: // thus we want to keep the progression in a state object outside the stream! makro @@ -42,7 +42,7 @@ pub fn create_geyser_reconnecting_stream( let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter = subscribe_filter.clone(); - log!(if attempt >= 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt + 1, addr); + log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr); async move { let connect_result = GeyserGrpcClient::connect_with_timeout( @@ -66,7 +66,7 @@ pub fn create_geyser_reconnecting_stream( } }); - (ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt + 1)) + (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1)) } ConnectionState::Connecting(attempt, connection_task) => { @@ -77,7 +77,7 @@ pub fn create_geyser_reconnecting_stream( Ok(Err(geyser_error)) => { // ATM we consider all errors recoverable warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error); - (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt + 1)) }, Err(geyser_grpc_task_error) => { panic!("task aborted - should not happen :{geyser_grpc_task_error}"); @@ -96,17 +96,17 @@ pub fn create_geyser_reconnecting_stream( Ok(Some(Err(tonic_status))) => { // ATM we consider all errors recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - (ConnectionState::WaitReconnect(0), Message::Connecting(0)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } Ok(None) => { // should not arrive here, Mean the stream close. warn!("geyser stream closed on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(0), Message::Connecting(0)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } Err(_elapsed) => { // timeout warn!("geyser stream timeout on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(0), Message::Connecting(0)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } } diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index b196a29..6aa725f 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,4 +1,4 @@ -use crate::{GrpcSourceConfig, Message}; +use crate::{Attempt, GrpcSourceConfig, Message}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use std::time::Duration; @@ -11,9 +11,6 @@ use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::Status; -// 1-based counter -type Attempt = u32; - enum ConnectionState>, F: Interceptor> { NotConnected(Attempt), // connected but not subscribed diff --git a/src/lib.rs b/src/lib.rs index 0cf09c1..a5c1690 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; mod obfuscate; +// 1-based attempt counter type Attempt = u32; // wraps payload and status messages From 37ed49c2101987be48fce961ad55af8875c55e58 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 15:50:43 +0100 Subject: [PATCH 6/7] clippy --- examples/stream_blocks_autoconnect.rs | 1 - examples/stream_blocks_mainnet_task.rs | 27 ++++---------------- examples/stream_blocks_single.rs | 1 + src/channel_plugger.rs | 9 ++++--- src/grpc_subscription_autoreconnect_tasks.rs | 22 ++++++++++++---- 5 files changed, 28 insertions(+), 32 deletions(-) diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 3e24cb0..6b3a40c 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -1,4 +1,3 @@ -use futures::StreamExt; use log::info; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; diff --git a/examples/stream_blocks_mainnet_task.rs b/examples/stream_blocks_mainnet_task.rs index 7a79a38..eb3ea6e 100644 --- a/examples/stream_blocks_mainnet_task.rs +++ b/examples/stream_blocks_mainnet_task.rs @@ -1,4 +1,4 @@ -use futures::{Stream, StreamExt}; +use futures::Stream; use log::{info, warn}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; @@ -33,20 +33,6 @@ use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; -fn start_example_block_consumer( - multiplex_stream: impl Stream + Send + 'static, -) { - tokio::spawn(async move { - let mut block_stream = pin!(multiplex_stream); - while let Some(block) = block_stream.next().await { - info!( - "emitted block #{}@{} from multiplexer", - block.slot, block.commitment_config.commitment - ); - } - }); -} - fn start_example_blockmeta_consumer(mut multiplex_channel: Receiver) { tokio::spawn(async move { loop { @@ -113,9 +99,6 @@ pub async fn main() { tracing_subscriber::fmt::init(); // console_subscriber::init(); - let subscribe_blocks = true; - let subscribe_blockmeta = false; - let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); let grpc_addr_blue = env::var("GRPC_ADDR2").expect("need grpc url for blue"); @@ -148,19 +131,19 @@ pub async fn main() { GrpcSourceConfig::new(grpc_addr_blue, grpc_x_token_blue, None, timeouts.clone()); let toxiproxy_config = GrpcSourceConfig::new(grpc_addr_toxiproxy, None, None, timeouts.clone()); - let (autoconnect_tx, mut blockmeta_rx) = tokio::sync::mpsc::channel(10); + let (autoconnect_tx, blockmeta_rx) = tokio::sync::mpsc::channel(10); info!("Write BlockMeta stream.."); - let green_stream_ah = create_geyser_autoconnection_task_with_mpsc( + let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), ); - let blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( + let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( blue_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), ); - let toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( + let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( toxiproxy_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 4abc002..47f2ff6 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -14,6 +14,7 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; +#[allow(dead_code)] fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, ) { diff --git a/src/channel_plugger.rs b/src/channel_plugger.rs index 9ba1628..ee713a6 100644 --- a/src/channel_plugger.rs +++ b/src/channel_plugger.rs @@ -1,7 +1,4 @@ -use log::{debug, info, warn}; -use std::time::Duration; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::mpsc::error::SendTimeoutError; +use log::debug; /// usage: see plug_pattern test pub fn spawn_broadcast_channel_plug( @@ -41,6 +38,10 @@ pub fn spawn_plugger_mpcs_to_broadcast( #[cfg(test)] mod tests { use super::*; + use log::{info, warn}; + use std::time::Duration; + use tokio::sync::broadcast::error::RecvError; + use tokio::sync::mpsc::error::SendTimeoutError; use tokio::time::{sleep, timeout}; #[tokio::test] diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 6aa725f..4413b97 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -143,16 +143,25 @@ pub fn create_geyser_autoconnection_task_with_mpsc( match subscribe_result { Ok(geyser_stream) => { if attempt > 1 { - debug!("subscribed to {} after {} failed attempts", grpc_source, attempt); + debug!( + "subscribed to {} after {} failed attempts", + grpc_source, attempt + ); } ConnectionState::Ready(geyser_stream) - }, + } Err(GeyserGrpcClientError::TonicError(_)) => { - warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt); + warn!( + "subscribe failed on {} after {} attempts - retrying", + grpc_source, attempt + ); ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::TonicStatus(_)) => { - warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt); + warn!( + "subscribe failed on {} after {} attempts - retrying", + grpc_source, attempt + ); ConnectionState::RecoverableConnectionError(attempt + 1) } // non-recoverable @@ -161,7 +170,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error ); - ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError) + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::SubscribeError, + ) } } } From 813931f210411ad114fff888b48d10b3f1df619f Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 17:56:16 +0100 Subject: [PATCH 7/7] fix stream message attempt counter --- src/grpc_subscription_autoreconnect_streams.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index 2098dce..a6d0506 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -66,7 +66,7 @@ pub fn create_geyser_reconnecting_stream( } }); - (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1)) + (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt)) } ConnectionState::Connecting(attempt, connection_task) => { @@ -77,7 +77,7 @@ pub fn create_geyser_reconnecting_stream( Ok(Err(geyser_error)) => { // ATM we consider all errors recoverable warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error); - (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt + 1)) + (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt)) }, Err(geyser_grpc_task_error) => { panic!("task aborted - should not happen :{geyser_grpc_task_error}");