diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index c6edbdb..dcc4e7b 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -5,16 +5,9 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::channel_plugger::{ - spawn_broadcast_channel_plug, spawn_plugger_mpcs_to_broadcast, -}; -use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; -use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ - create_geyser_autoconnection_task, -}; -use geyser_grpc_connector::grpcmultiplex_fastestwins::{ - create_multiplexed_stream, FromYellowstoneExtractor, -}; +use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug; +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task; +use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use tokio::time::{sleep, Duration}; use tracing::warn; @@ -22,20 +15,6 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; -fn start_example_blockmini_consumer( - multiplex_stream: impl Stream + Send + 'static, -) { - tokio::spawn(async move { - let mut blockmeta_stream = pin!(multiplex_stream); - while let Some(mini) = blockmeta_stream.next().await { - info!( - "emitted block mini #{}@{} with {} bytes from multiplexer", - mini.slot, mini.commitment_config.commitment, mini.blocksize - ); - } - }); -} - pub struct BlockMini { pub blocksize: usize, pub slot: Slot, @@ -73,7 +52,7 @@ impl FromYellowstoneExtractor for BlockMiniExtractor { } } -#[warn(dead_code)] +#[allow(dead_code)] enum TestCases { Basic, SlowReceiverStartup, @@ -102,6 +81,7 @@ pub async fn main() { connect_timeout: Duration::from_secs(5), request_timeout: Duration::from_secs(5), subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), }; let green_config = diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index a8e0836..e2c2731 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -129,6 +129,7 @@ pub async fn main() { connect_timeout: Duration::from_secs(5), request_timeout: Duration::from_secs(5), subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), }; let green_config = diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index fd1ecb2..9b1e85c 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -5,9 +5,7 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ - create_geyser_reconnecting_stream, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; @@ -88,10 +86,10 @@ pub async fn main() { connect_timeout: Duration::from_secs(5), request_timeout: Duration::from_secs(5), subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), }; - let config = - GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); info!("Write Block stream.."); @@ -156,7 +154,6 @@ pub async fn main() { sleep(Duration::from_secs(1800)).await; } - fn map_block_update(update: SubscribeUpdate) -> Option { match update.update_oneof { Some(UpdateOneof::Block(update_block_message)) => { diff --git a/src/channel_plugger.rs b/src/channel_plugger.rs index b3b3ee8..9ba1628 100644 --- a/src/channel_plugger.rs +++ b/src/channel_plugger.rs @@ -2,8 +2,6 @@ use log::{debug, info, warn}; use std::time::Duration; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc::error::SendTimeoutError; -use tokio::time::{sleep, timeout}; -use crate::grpcmultiplex_fastestwins::FromYellowstoneExtractor; /// usage: see plug_pattern test pub fn spawn_broadcast_channel_plug( @@ -43,11 +41,12 @@ pub fn spawn_plugger_mpcs_to_broadcast( #[cfg(test)] mod tests { use super::*; + use tokio::time::{sleep, timeout}; #[tokio::test] async fn plug_pattern() { - let (jh_task, message_channel) = tokio::sync::mpsc::channel::(1); - let broadcast_rx = + let (_jh_task, message_channel) = tokio::sync::mpsc::channel::(1); + let _broadcast_rx = spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel); } diff --git a/src/grpc_subscription.rs b/src/grpc_subscription.rs index e4059f4..0d0d1ce 100644 --- a/src/grpc_subscription.rs +++ b/src/grpc_subscription.rs @@ -1,15 +1,10 @@ -// use crate::{ -// endpoint_stremers::EndpointStreaming, -// rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info, -// }; use anyhow::{bail, Context}; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; -use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::broadcast::Sender; use yellowstone_grpc_client::GeyserGrpcClient; -use yellowstone_grpc_proto::geyser::SubscribeRequest; use yellowstone_grpc_proto::prelude::{ subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock, diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index 7d403a6..e070292 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -1,14 +1,12 @@ use crate::{Attempt, GrpcSourceConfig, Message}; use async_stream::stream; use futures::{Stream, StreamExt}; -use log::{debug, error, info, log, trace, warn, Level}; -use std::fmt::{Debug, Display}; +use log::{debug, info, log, trace, warn, Level}; use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; -use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::Status; enum ConnectionState>> { @@ -143,6 +141,7 @@ mod tests { connect_timeout: Duration::from_secs(1), request_timeout: Duration::from_secs(2), subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), }; assert_eq!( format!( @@ -164,6 +163,7 @@ mod tests { connect_timeout: Duration::from_secs(1), request_timeout: Duration::from_secs(2), subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), }; assert_eq!( format!( diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 66d6d61..fbc6354 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,14 +1,11 @@ use crate::{GrpcSourceConfig, Message}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; -use std::fmt::{Debug, Display}; -use std::future::Future; use std::time::Duration; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Receiver; use tokio::task::AbortHandle; use tokio::time::{sleep, timeout, Instant}; -use tokio::time::error::Elapsed; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; @@ -214,7 +211,12 @@ pub fn create_geyser_autoconnection_task( ConnectionState::Ready(attempt, mut geyser_stream) => { let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); 'recv_loop: loop { - match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await { + match timeout( + receive_timeout.unwrap_or(Duration::MAX), + geyser_stream.next(), + ) + .await + { Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); // note: first send never blocks as the mpsc channel has capacity 1 @@ -309,6 +311,7 @@ mod tests { connect_timeout: Duration::from_secs(1), request_timeout: Duration::from_secs(2), subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), }; assert_eq!( format!( @@ -330,6 +333,7 @@ mod tests { connect_timeout: Duration::from_secs(1), request_timeout: Duration::from_secs(2), subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), }; assert_eq!( format!( diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index b0f36f0..78dbde1 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -1,11 +1,11 @@ use crate::Message; +use crate::Message::GeyserSubscribeUpdate; use async_stream::stream; use futures::{Stream, StreamExt}; use log::{info, warn}; use merge_streams::MergeStreams; use solana_sdk::clock::Slot; use yellowstone_grpc_proto::geyser::SubscribeUpdate; -use crate::Message::GeyserSubscribeUpdate; pub trait FromYellowstoneExtractor { // Target is something like ProducedBlock diff --git a/src/lib.rs b/src/lib.rs index dd3f2ad..2a493a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,10 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::time::Duration; -use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate}; +use yellowstone_grpc_proto::geyser::{ + CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate, +}; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; pub mod channel_plugger; @@ -28,6 +31,7 @@ pub struct GrpcConnectionTimeouts { pub connect_timeout: Duration, pub request_timeout: Duration, pub subscribe_timeout: Duration, + pub receive_timeout: Duration, } #[derive(Clone)] @@ -127,10 +131,12 @@ impl GeyserFilter { pub fn slots(&self) -> SubscribeRequest { let mut slots_subs = HashMap::new(); - slots_subs.insert("client".to_string(), - SubscribeRequestFilterSlots { - filter_by_commitment: Some(true), - }); + slots_subs.insert( + "client".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }, + ); SubscribeRequest { slots: slots_subs,