diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 3e24cb0..6b3a40c 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -1,4 +1,3 @@ -use futures::StreamExt; use log::info; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet_stream.rs similarity index 100% rename from examples/stream_blocks_mainnet.rs rename to examples/stream_blocks_mainnet_stream.rs diff --git a/examples/stream_blocks_mainnet_task.rs b/examples/stream_blocks_mainnet_task.rs new file mode 100644 index 0000000..eb3ea6e --- /dev/null +++ b/examples/stream_blocks_mainnet_task.rs @@ -0,0 +1,361 @@ +use futures::Stream; +use log::{info, warn}; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::env; +use std::pin::pin; + +use base64::Engine; +use itertools::Itertools; +use solana_sdk::borsh0_10::try_from_slice_unchecked; +/// This file mocks the core model of the RPC server. +use solana_sdk::compute_budget; +use solana_sdk::compute_budget::ComputeBudgetInstruction; +use solana_sdk::hash::Hash; +use solana_sdk::instruction::CompiledInstruction; +use solana_sdk::message::v0::MessageAddressTableLookup; +use solana_sdk::message::{v0, MessageHeader, VersionedMessage}; +use solana_sdk::pubkey::Pubkey; + +use solana_sdk::signature::Signature; +use solana_sdk::transaction::TransactionError; +use tokio::sync::mpsc::Receiver; +use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; + +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ + create_geyser_autoconnection_task, create_geyser_autoconnection_task_with_mpsc, +}; +use geyser_grpc_connector::grpcmultiplex_fastestwins::{ + create_multiplexed_stream, FromYellowstoneExtractor, +}; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; +use tokio::time::{sleep, Duration}; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::SubscribeUpdate; + +fn start_example_blockmeta_consumer(mut multiplex_channel: Receiver) { + tokio::spawn(async move { + loop { + match multiplex_channel.recv().await { + Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { + Some(UpdateOneof::BlockMeta(meta)) => { + info!("emitted blockmeta #{} from multiplexer", meta.slot); + } + None => {} + _ => {} + }, + None => { + warn!("multiplexer channel closed - aborting"); + return; + } + Some(Message::Connecting(_)) => {} + } + } + }); +} + +struct BlockExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockExtractor { + type Target = ProducedBlock; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let block = map_produced_block(update_block_message, self.0); + Some((block.slot, block)) + } + _ => None, + } + } +} + +pub struct BlockMetaMini { + pub slot: Slot, + pub commitment_config: CommitmentConfig, +} + +struct BlockMetaExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockMetaExtractor { + type Target = BlockMetaMini; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => { + let slot = update_blockmeta_message.slot; + let mini = BlockMetaMini { + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + _ => None, + } + } +} + +#[tokio::main] +pub async fn main() { + // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace + tracing_subscriber::fmt::init(); + // console_subscriber::init(); + + let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); + let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); + let grpc_addr_blue = env::var("GRPC_ADDR2").expect("need grpc url for blue"); + let grpc_x_token_blue = env::var("GRPC_X_TOKEN2").ok(); + // via toxiproxy + let grpc_addr_toxiproxy = "http://127.0.0.1:10001".to_string(); + + info!( + "Using green on {} ({})", + grpc_addr_green, + grpc_x_token_green.is_some() + ); + info!( + "Using blue on {} ({})", + grpc_addr_blue, + grpc_x_token_blue.is_some() + ); + info!("Using toxiproxy on {}", grpc_addr_toxiproxy); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), + }; + + let green_config = + GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + let blue_config = + GrpcSourceConfig::new(grpc_addr_blue, grpc_x_token_blue, None, timeouts.clone()); + let toxiproxy_config = GrpcSourceConfig::new(grpc_addr_toxiproxy, None, None, timeouts.clone()); + + let (autoconnect_tx, blockmeta_rx) = tokio::sync::mpsc::channel(10); + info!("Write BlockMeta stream.."); + let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc( + green_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + autoconnect_tx.clone(), + ); + let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( + blue_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + autoconnect_tx.clone(), + ); + let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( + toxiproxy_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + autoconnect_tx.clone(), + ); + start_example_blockmeta_consumer(blockmeta_rx); + + // "infinite" sleep + sleep(Duration::from_secs(1800)).await; +} + +#[derive(Default, Debug, Clone)] +pub struct ProducedBlock { + pub transactions: Vec, + // pub leader_id: Option, + pub blockhash: String, + pub block_height: u64, + pub slot: Slot, + pub parent_slot: Slot, + pub block_time: u64, + pub commitment_config: CommitmentConfig, + pub previous_blockhash: String, + // pub rewards: Option>, +} + +#[derive(Debug, Clone)] +pub struct TransactionInfo { + pub signature: String, + pub err: Option, + pub cu_requested: Option, + pub prioritization_fees: Option, + pub cu_consumed: Option, + pub recent_blockhash: String, + pub message: String, +} + +pub fn map_produced_block( + block: SubscribeUpdateBlock, + commitment_config: CommitmentConfig, +) -> ProducedBlock { + let txs: Vec = block + .transactions + .into_iter() + .filter_map(|tx| { + let Some(meta) = tx.meta else { + return None; + }; + + let Some(transaction) = tx.transaction else { + return None; + }; + + let Some(message) = transaction.message else { + return None; + }; + + let Some(header) = message.header else { + return None; + }; + + let signatures = transaction + .signatures + .into_iter() + .filter_map(|sig| match Signature::try_from(sig) { + Ok(sig) => Some(sig), + Err(_) => { + log::warn!( + "Failed to read signature from transaction in block {} - skipping", + block.blockhash + ); + None + } + }) + .collect_vec(); + + let err = meta.err.map(|x| { + bincode::deserialize::(&x.err) + .expect("TransactionError should be deserialized") + }); + + let signature = signatures[0]; + let compute_units_consumed = meta.compute_units_consumed; + + let message = VersionedMessage::V0(v0::Message { + header: MessageHeader { + num_required_signatures: header.num_required_signatures as u8, + num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, + num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8, + }, + account_keys: message + .account_keys + .into_iter() + .map(|key| { + let bytes: [u8; 32] = + key.try_into().unwrap_or(Pubkey::default().to_bytes()); + Pubkey::new_from_array(bytes) + }) + .collect(), + recent_blockhash: Hash::new(&message.recent_blockhash), + instructions: message + .instructions + .into_iter() + .map(|ix| CompiledInstruction { + program_id_index: ix.program_id_index as u8, + accounts: ix.accounts, + data: ix.data, + }) + .collect(), + address_table_lookups: message + .address_table_lookups + .into_iter() + .map(|table| { + let bytes: [u8; 32] = table + .account_key + .try_into() + .unwrap_or(Pubkey::default().to_bytes()); + MessageAddressTableLookup { + account_key: Pubkey::new_from_array(bytes), + writable_indexes: table.writable_indexes, + readonly_indexes: table.readonly_indexes, + } + }) + .collect(), + }); + + let legacy_compute_budget: Option<(u32, Option)> = + message.instructions().iter().find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + if additional_fee > 0 { + return Some(( + units, + Some(((units * 1000) / additional_fee) as u64), + )); + } else { + return Some((units, None)); + } + } + } + None + }); + + let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); + let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None); + + let cu_requested = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } + } + None + }) + .or(legacy_cu_requested); + + let prioritization_fees = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(price); + } + } + + None + }) + .or(legacy_prioritization_fees); + + Some(TransactionInfo { + signature: signature.to_string(), + err, + cu_requested, + prioritization_fees, + cu_consumed: compute_units_consumed, + recent_blockhash: message.recent_blockhash().to_string(), + message: base64::engine::general_purpose::STANDARD.encode(message.serialize()), + }) + }) + .collect(); + + // removed rewards + + ProducedBlock { + transactions: txs, + block_height: block + .block_height + .map(|block_height| block_height.block_height) + .unwrap(), + block_time: block.block_time.map(|time| time.timestamp).unwrap() as u64, + blockhash: block.blockhash, + previous_blockhash: block.parent_blockhash, + commitment_config, + // leader_id, + parent_slot: block.parent_slot, + slot: block.slot, + // rewards, + } +} diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 4abc002..47f2ff6 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -14,6 +14,7 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; +#[allow(dead_code)] fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, ) { diff --git a/src/channel_plugger.rs b/src/channel_plugger.rs index 9ba1628..ee713a6 100644 --- a/src/channel_plugger.rs +++ b/src/channel_plugger.rs @@ -1,7 +1,4 @@ -use log::{debug, info, warn}; -use std::time::Duration; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::mpsc::error::SendTimeoutError; +use log::debug; /// usage: see plug_pattern test pub fn spawn_broadcast_channel_plug( @@ -41,6 +38,10 @@ pub fn spawn_plugger_mpcs_to_broadcast( #[cfg(test)] mod tests { use super::*; + use log::{info, warn}; + use std::time::Duration; + use tokio::sync::broadcast::error::RecvError; + use tokio::sync::mpsc::error::SendTimeoutError; use tokio::time::{sleep, timeout}; #[tokio::test] diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index e070292..a6d0506 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -12,7 +12,7 @@ use yellowstone_grpc_proto::tonic::Status; enum ConnectionState>> { NotConnected(Attempt), Connecting(Attempt, JoinHandle>), - Ready(Attempt, S), + Ready(S), WaitReconnect(Attempt), } @@ -22,7 +22,7 @@ pub fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, ) -> impl Stream { - let mut state = ConnectionState::NotConnected(0); + let mut state = ConnectionState::NotConnected(1); // in case of cancellation, we restart from here: // thus we want to keep the progression in a state object outside the stream! makro @@ -32,8 +32,7 @@ pub fn create_geyser_reconnecting_stream( (state, yield_value) = match state { - ConnectionState::NotConnected(mut attempt) => { - attempt += 1; + ConnectionState::NotConnected(attempt) => { let connection_task = tokio::spawn({ let addr = grpc_source.grpc_addr.clone(); @@ -67,18 +66,18 @@ pub fn create_geyser_reconnecting_stream( } }); - (ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt)) + (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt)) } ConnectionState::Connecting(attempt, connection_task) => { let subscribe_result = connection_task.await; match subscribe_result { - Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)), + Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), Message::Connecting(attempt)), Ok(Err(geyser_error)) => { // ATM we consider all errors recoverable warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt)) }, Err(geyser_grpc_task_error) => { panic!("task aborted - should not happen :{geyser_grpc_task_error}"); @@ -87,27 +86,27 @@ pub fn create_geyser_reconnecting_stream( } - ConnectionState::Ready(attempt, mut geyser_stream) => { + ConnectionState::Ready(mut geyser_stream) => { let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await { Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); - (ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message))) + (ConnectionState::Ready(geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message))) } Ok(Some(Err(tonic_status))) => { // ATM we consider all errors recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } Ok(None) => { // should not arrive here, Mean the stream close. warn!("geyser stream closed on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } Err(_elapsed) => { // timeout warn!("geyser stream timeout on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } } diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index bf786e5..4413b97 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,4 +1,4 @@ -use crate::{GrpcSourceConfig, Message}; +use crate::{Attempt, GrpcSourceConfig, Message}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use std::time::Duration; @@ -11,12 +11,11 @@ use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::Status; -type Attempt = u32; - enum ConnectionState>, F: Interceptor> { NotConnected(Attempt), - Connected(Attempt, GeyserGrpcClient), - Ready(Attempt, S), + // connected but not subscribed + Connecting(Attempt, GeyserGrpcClient), + Ready(S), // error states RecoverableConnectionError(Attempt), // non-recoverable error @@ -37,7 +36,8 @@ pub fn create_geyser_autoconnection_task( ) -> (AbortHandle, Receiver) { let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); - let abort_handle = create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); + let abort_handle = + create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); (abort_handle, receiver_channel) } @@ -54,14 +54,12 @@ pub fn create_geyser_autoconnection_task_with_mpsc( // task will be aborted when downstream receiver gets dropped let jh_geyser_task = tokio::spawn(async move { - let mut state = ConnectionState::NotConnected(0); + let mut state = ConnectionState::NotConnected(1); let mut messages_forwarded = 0; loop { state = match state { - ConnectionState::NotConnected(mut attempt) => { - attempt += 1; - + ConnectionState::NotConnected(attempt) => { let addr = grpc_source.grpc_addr.clone(); let token = grpc_source.grpc_x_token.clone(); let config = grpc_source.tls_config.clone(); @@ -73,7 +71,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } else { Level::Debug }, - "Connecting attempt #{} to {}", + "Connecting attempt {} to {}", attempt, addr ); @@ -88,20 +86,20 @@ pub fn create_geyser_autoconnection_task_with_mpsc( .await; match connect_result { - Ok(client) => ConnectionState::Connected(attempt, client), + Ok(client) => ConnectionState::Connecting(attempt, client), Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError( - attempt, + attempt + 1, FatalErrorReason::ConfigurationError, ), Err(GeyserGrpcClientError::MetadataValueError(_)) => { ConnectionState::FatalError( - attempt, + attempt + 1, FatalErrorReason::ConfigurationError, ) } Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { ConnectionState::FatalError( - attempt, + attempt + 1, FatalErrorReason::ConfigurationError, ) } @@ -110,25 +108,25 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "connect failed on {} - aborting: {:?}", grpc_source, tonic_error ); - ConnectionState::FatalError(attempt, FatalErrorReason::NetworkError) + ConnectionState::FatalError(attempt + 1, FatalErrorReason::NetworkError) } Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => { warn!( "connect failed on {} - retrying: {:?}", grpc_source, tonic_status ); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => { warn!( "connect failed with send error on {} - retrying: {:?}", grpc_source, send_error ); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } } } - ConnectionState::Connected(attempt, mut client) => { + ConnectionState::Connecting(attempt, mut client) => { let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter = subscribe_filter.clone(); @@ -143,14 +141,28 @@ pub fn create_geyser_autoconnection_task_with_mpsc( match subscribe_result_timeout { Ok(subscribe_result) => { match subscribe_result { - Ok(geyser_stream) => ConnectionState::Ready(attempt, geyser_stream), + Ok(geyser_stream) => { + if attempt > 1 { + debug!( + "subscribed to {} after {} failed attempts", + grpc_source, attempt + ); + } + ConnectionState::Ready(geyser_stream) + } Err(GeyserGrpcClientError::TonicError(_)) => { - warn!("subscribe failed on {} - retrying", grpc_source); - ConnectionState::RecoverableConnectionError(attempt) + warn!( + "subscribe failed on {} after {} attempts - retrying", + grpc_source, attempt + ); + ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::TonicStatus(_)) => { - warn!("subscribe failed on {} - retrying", grpc_source); - ConnectionState::RecoverableConnectionError(attempt) + warn!( + "subscribe failed on {} after {} attempts - retrying", + grpc_source, attempt + ); + ConnectionState::RecoverableConnectionError(attempt + 1) } // non-recoverable Err(unrecoverable_error) => { @@ -159,7 +171,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( grpc_source, unrecoverable_error ); ConnectionState::FatalError( - attempt, + attempt + 1, FatalErrorReason::SubscribeError, ) } @@ -170,7 +182,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe failed with timeout on {} - retrying", grpc_source ); - ConnectionState::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt + 1) } } } @@ -210,7 +222,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( sleep(Duration::from_secs_f32(backoff_secs)).await; ConnectionState::NotConnected(attempt) } - ConnectionState::Ready(attempt, mut geyser_stream) => { + ConnectionState::Ready(mut geyser_stream) => { let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); 'recv_loop: loop { match timeout( @@ -264,7 +276,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Err(_send_error) => { warn!("downstream receiver closed, message is lost - aborting"); break 'recv_loop ConnectionState::FatalError( - attempt, + 0, FatalErrorReason::DownstreamChannelClosed, ); } @@ -273,7 +285,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Err(SendTimeoutError::Closed(_)) => { warn!("downstream receiver closed - aborting"); break 'recv_loop ConnectionState::FatalError( - attempt, + 0, FatalErrorReason::DownstreamChannelClosed, ); } @@ -282,17 +294,17 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Ok(Some(Err(tonic_status))) => { // all tonic errors are recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - break 'recv_loop ConnectionState::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(1); } Ok(None) => { warn!("geyser stream closed on {} - retrying", grpc_source); - break 'recv_loop ConnectionState::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(1); } Err(_elapsed) => { warn!("timeout on {} - retrying", grpc_source); - break 'recv_loop ConnectionState::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(1); } - } + } // -- END match } // -- END receive loop } } // -- END match diff --git a/src/lib.rs b/src/lib.rs index 0cf09c1..a5c1690 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; mod obfuscate; +// 1-based attempt counter type Attempt = u32; // wraps payload and status messages