From ce6ca26028c4466e0236657a76b9db2cccf4d535 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 27 Mar 2024 17:47:25 +0100 Subject: [PATCH] Adding notify channel to stop tasks --- examples/stream_blocks_autoconnect.rs | 5 + examples/stream_blocks_mainnet_task.rs | 15 +- ...grpc_subscription_autoreconnect_streams.rs | 5 +- src/grpc_subscription_autoreconnect_tasks.rs | 132 ++++++++++++------ src/yellowstone_grpc_util.rs | 35 ++--- 5 files changed, 127 insertions(+), 65 deletions(-) diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 6b3a40c..02ddda6 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -2,6 +2,8 @@ use log::info; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::env; +use std::sync::Arc; +use tokio::sync::Notify; use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task; @@ -87,9 +89,12 @@ pub async fn main() { info!("Write Block stream.."); + let exit_notify = Arc::new(Notify::new()); + let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), + exit_notify, ); let mut message_channel = spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel); diff --git a/examples/stream_blocks_mainnet_task.rs b/examples/stream_blocks_mainnet_task.rs index 24c5eb4..265bbdf 100644 --- a/examples/stream_blocks_mainnet_task.rs +++ b/examples/stream_blocks_mainnet_task.rs @@ -1,8 +1,9 @@ -use futures::Stream; use log::{info, warn}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::env; +use std::sync::Arc; +use tokio::sync::Notify; use base64::Engine; use itertools::Itertools; @@ -21,12 +22,8 @@ use solana_sdk::transaction::TransactionError; use tokio::sync::mpsc::Receiver; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; -use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ - create_geyser_autoconnection_task, create_geyser_autoconnection_task_with_mpsc, -}; -use geyser_grpc_connector::grpcmultiplex_fastestwins::{ - create_multiplexed_stream, FromYellowstoneExtractor, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; +use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; @@ -123,6 +120,7 @@ pub async fn main() { subscribe_timeout: Duration::from_secs(5), receive_timeout: Duration::from_secs(5), }; + let exit_notify = Arc::new(Notify::new()); let green_config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); @@ -136,16 +134,19 @@ pub async fn main() { green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), + exit_notify.clone(), ); let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( blue_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), + exit_notify.clone(), ); let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( toxiproxy_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), + exit_notify.clone(), ); start_example_blockmeta_consumer(blockmeta_rx); diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index e53920c..3013f87 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -1,11 +1,11 @@ -use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util}; +use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message}; use async_stream::stream; use futures::{Stream, StreamExt}; use log::{debug, info, log, trace, warn, Level}; use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; -use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; +use yellowstone_grpc_client::GeyserGrpcClientResult; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::Status; @@ -22,7 +22,6 @@ pub fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, ) -> impl Stream { - let mut state = ConnectionState::NotConnected(1); // in case of cancellation, we restart from here: diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 9fd3aca..37d37f2 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,10 +1,12 @@ -use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util}; +use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; +use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Receiver; -use tokio::task::AbortHandle; +use tokio::sync::Notify; +use tokio::task::JoinHandle; use tokio::time::{sleep, timeout, Instant}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; @@ -33,13 +35,18 @@ enum FatalErrorReason { pub fn create_geyser_autoconnection_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, -) -> (AbortHandle, Receiver) { + exit_notify: Arc, +) -> (JoinHandle<()>, Receiver) { let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); - let abort_handle = - create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); + let join_handle = create_geyser_autoconnection_task_with_mpsc( + grpc_source, + subscribe_filter, + sender, + exit_notify, + ); - (abort_handle, receiver_channel) + (join_handle, receiver_channel) } /// connect to grpc source performing autoconnect if required, @@ -49,16 +56,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, mpsc_downstream: tokio::sync::mpsc::Sender, -) -> AbortHandle { + exit_notify: Arc, +) -> JoinHandle<()> { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ - // task will be aborted when downstream receiver gets dropped let jh_geyser_task = tokio::spawn(async move { let mut state = ConnectionState::NotConnected(1); let mut messages_forwarded = 0; - loop { + 'main_loop: loop { state = match state { ConnectionState::NotConnected(attempt) => { let addr = grpc_source.grpc_addr.clone(); @@ -79,15 +86,21 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let buffer_config = yellowstone_grpc_util::GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter); debug!("Using Grpc Buffer config {:?}", buffer_config); - let connect_result = yellowstone_grpc_util::connect_with_timeout_with_buffers( - addr, - token, - config, - connect_timeout, - request_timeout, - buffer_config, - ) - .await; + let connect_result = tokio::select! { + res = yellowstone_grpc_util::connect_with_timeout_with_buffers( + addr, + token, + config, + connect_timeout, + request_timeout, + buffer_config, + ) => { + res + }, + _ = exit_notify.notified() => { + break 'main_loop; + } + }; match connect_result { Ok(client) => ConnectionState::Connecting(attempt, client), @@ -136,13 +149,17 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let subscribe_filter = subscribe_filter.clone(); debug!("Subscribe with filter {:?}", subscribe_filter); - - - let subscribe_result_timeout = timeout( - subscribe_timeout.unwrap_or(Duration::MAX), - client.subscribe_once2(subscribe_filter), - ) - .await; + let subscribe_result_timeout = tokio::select! { + res = timeout( + subscribe_timeout.unwrap_or(Duration::MAX), + client.subscribe_once2(subscribe_filter), + ) => { + res + }, + _ = exit_notify.notified() => { + break 'main_loop; + } + }; match subscribe_result_timeout { Ok(subscribe_result) => { @@ -198,7 +215,14 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source ); - sleep(Duration::from_secs_f32(backoff_secs)).await; + tokio::select! { + _ = sleep(Duration::from_secs_f32(backoff_secs)) => { + //slept + }, + _ = exit_notify.notified() => { + break 'main_loop; + } + }; ConnectionState::NotConnected(attempt) } ConnectionState::FatalError(_attempt, reason) => match reason { @@ -225,18 +249,31 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source ); - sleep(Duration::from_secs_f32(backoff_secs)).await; + tokio::select! { + _ = sleep(Duration::from_secs_f32(backoff_secs)) => { + //slept + }, + _ = exit_notify.notified() => { + break 'main_loop; + } + }; ConnectionState::NotConnected(attempt) } ConnectionState::Ready(mut geyser_stream) => { let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); 'recv_loop: loop { - match timeout( - receive_timeout.unwrap_or(Duration::MAX), - geyser_stream.next(), - ) - .await - { + let geyser_stream_res = tokio::select! { + res = timeout( + receive_timeout.unwrap_or(Duration::MAX), + geyser_stream.next(), + ) => { + res + }, + _ = exit_notify.notified() => { + break 'main_loop; + } + }; + match geyser_stream_res { Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); // note: first send never blocks as the mpsc channel has capacity 1 @@ -246,13 +283,21 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Duration::from_millis(500) }; let started_at = Instant::now(); - match mpsc_downstream + + let mpsc_downstream_result = tokio::select! { + res = mpsc_downstream .send_timeout( Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold, - ) - .await - { + ) => { + res + }, + _ = exit_notify.notified() => { + break 'main_loop; + } + }; + + match mpsc_downstream_result { Ok(()) => { messages_forwarded += 1; if messages_forwarded == 1 { @@ -270,7 +315,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Err(SendTimeoutError::Timeout(the_message)) => { warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis()); - match mpsc_downstream.send(the_message).await { + let mpsc_downstream_result = tokio::select! { + res = mpsc_downstream.send(the_message)=> { + res + }, + _ = exit_notify.notified() => { + break 'main_loop; + } + }; + + match mpsc_downstream_result { Ok(()) => { messages_forwarded += 1; trace!( @@ -317,7 +371,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } // -- endless state loop }); - jh_geyser_task.abort_handle() + jh_geyser_task } #[cfg(test)] diff --git a/src/yellowstone_grpc_util.rs b/src/yellowstone_grpc_util.rs index bb5d4b9..587fb9d 100644 --- a/src/yellowstone_grpc_util.rs +++ b/src/yellowstone_grpc_util.rs @@ -5,12 +5,11 @@ use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient; use yellowstone_grpc_proto::geyser::SubscribeRequest; use yellowstone_grpc_proto::prost::bytes::Bytes; use yellowstone_grpc_proto::tonic; -use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue; use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; +use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; - pub async fn connect_with_timeout( endpoint: E, x_token: Option, @@ -19,15 +18,21 @@ pub async fn connect_with_timeout( request_timeout: Option, connect_lazy: bool, ) -> GeyserGrpcClientResult> - where - E: Into, - T: TryInto, +where + E: Into, + T: TryInto, { GeyserGrpcClient::connect_with_timeout( - endpoint, x_token, tls_config, connect_timeout, request_timeout, connect_lazy).await + endpoint, + x_token, + tls_config, + connect_timeout, + request_timeout, + connect_lazy, + ) + .await } - // see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs const DEFAULT_BUFFER_SIZE: usize = 1024; // see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45 @@ -52,22 +57,19 @@ impl Default for GeyserGrpcClientBufferConfig { } impl GeyserGrpcClientBufferConfig { - pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig { if !filter.blocks.is_empty() { GeyserGrpcClientBufferConfig { - buffer_size: Some(65536), // 64kb (default: 1k) - conn_window: Some(5242880), // 5mb (=default) + buffer_size: Some(65536), // 64kb (default: 1k) + conn_window: Some(5242880), // 5mb (=default) stream_window: Some(4194304), // 4mb (default: 2m) } } else { GeyserGrpcClientBufferConfig::default() } } - } - pub async fn connect_with_timeout_with_buffers( endpoint: E, x_token: Option, @@ -76,9 +78,10 @@ pub async fn connect_with_timeout_with_buffers( request_timeout: Option, buffer_config: GeyserGrpcClientBufferConfig, ) -> GeyserGrpcClientResult> - where - E: Into, - T: TryInto, { +where + E: Into, + T: TryInto, +{ // see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10 let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)? .buffer_size(buffer_config.buffer_size) @@ -110,4 +113,4 @@ pub async fn connect_with_timeout_with_buffers( .max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()), ); Ok(client) -} \ No newline at end of file +}