From 37ed49c2101987be48fce961ad55af8875c55e58 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 11 Mar 2024 15:50:43 +0100 Subject: [PATCH] clippy --- examples/stream_blocks_autoconnect.rs | 1 - examples/stream_blocks_mainnet_task.rs | 27 ++++---------------- examples/stream_blocks_single.rs | 1 + src/channel_plugger.rs | 9 ++++--- src/grpc_subscription_autoreconnect_tasks.rs | 22 ++++++++++++---- 5 files changed, 28 insertions(+), 32 deletions(-) diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 3e24cb0..6b3a40c 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -1,4 +1,3 @@ -use futures::StreamExt; use log::info; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; diff --git a/examples/stream_blocks_mainnet_task.rs b/examples/stream_blocks_mainnet_task.rs index 7a79a38..eb3ea6e 100644 --- a/examples/stream_blocks_mainnet_task.rs +++ b/examples/stream_blocks_mainnet_task.rs @@ -1,4 +1,4 @@ -use futures::{Stream, StreamExt}; +use futures::Stream; use log::{info, warn}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; @@ -33,20 +33,6 @@ use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; -fn start_example_block_consumer( - multiplex_stream: impl Stream + Send + 'static, -) { - tokio::spawn(async move { - let mut block_stream = pin!(multiplex_stream); - while let Some(block) = block_stream.next().await { - info!( - "emitted block #{}@{} from multiplexer", - block.slot, block.commitment_config.commitment - ); - } - }); -} - fn start_example_blockmeta_consumer(mut multiplex_channel: Receiver) { tokio::spawn(async move { loop { @@ -113,9 +99,6 @@ pub async fn main() { tracing_subscriber::fmt::init(); // console_subscriber::init(); - let subscribe_blocks = true; - let subscribe_blockmeta = false; - let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); let grpc_addr_blue = env::var("GRPC_ADDR2").expect("need grpc url for blue"); @@ -148,19 +131,19 @@ pub async fn main() { GrpcSourceConfig::new(grpc_addr_blue, grpc_x_token_blue, None, timeouts.clone()); let toxiproxy_config = GrpcSourceConfig::new(grpc_addr_toxiproxy, None, None, timeouts.clone()); - let (autoconnect_tx, mut blockmeta_rx) = tokio::sync::mpsc::channel(10); + let (autoconnect_tx, blockmeta_rx) = tokio::sync::mpsc::channel(10); info!("Write BlockMeta stream.."); - let green_stream_ah = create_geyser_autoconnection_task_with_mpsc( + let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), ); - let blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( + let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( blue_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), ); - let toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( + let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( toxiproxy_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 4abc002..47f2ff6 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -14,6 +14,7 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; +#[allow(dead_code)] fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, ) { diff --git a/src/channel_plugger.rs b/src/channel_plugger.rs index 9ba1628..ee713a6 100644 --- a/src/channel_plugger.rs +++ b/src/channel_plugger.rs @@ -1,7 +1,4 @@ -use log::{debug, info, warn}; -use std::time::Duration; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::mpsc::error::SendTimeoutError; +use log::debug; /// usage: see plug_pattern test pub fn spawn_broadcast_channel_plug( @@ -41,6 +38,10 @@ pub fn spawn_plugger_mpcs_to_broadcast( #[cfg(test)] mod tests { use super::*; + use log::{info, warn}; + use std::time::Duration; + use tokio::sync::broadcast::error::RecvError; + use tokio::sync::mpsc::error::SendTimeoutError; use tokio::time::{sleep, timeout}; #[tokio::test] diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 6aa725f..4413b97 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -143,16 +143,25 @@ pub fn create_geyser_autoconnection_task_with_mpsc( match subscribe_result { Ok(geyser_stream) => { if attempt > 1 { - debug!("subscribed to {} after {} failed attempts", grpc_source, attempt); + debug!( + "subscribed to {} after {} failed attempts", + grpc_source, attempt + ); } ConnectionState::Ready(geyser_stream) - }, + } Err(GeyserGrpcClientError::TonicError(_)) => { - warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt); + warn!( + "subscribe failed on {} after {} attempts - retrying", + grpc_source, attempt + ); ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::TonicStatus(_)) => { - warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt); + warn!( + "subscribe failed on {} after {} attempts - retrying", + grpc_source, attempt + ); ConnectionState::RecoverableConnectionError(attempt + 1) } // non-recoverable @@ -161,7 +170,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error ); - ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError) + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::SubscribeError, + ) } } }