Changing Notify channel to broadcast channel

This commit is contained in:
godmodegalactus 2024-04-02 14:36:22 +02:00 committed by GroovieGermanikus
parent 7f115fb8b5
commit 280f03d4d5
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 16 additions and 20 deletions

View File

@ -2,8 +2,7 @@ use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
@ -89,7 +88,7 @@ pub async fn main() {
info!("Write Block stream..");
let exit_notify = Arc::new(Notify::new());
let (_, exit_notify) = broadcast::channel(1);
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),

View File

@ -2,8 +2,6 @@ use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::sync::Arc;
use tokio::sync::Notify;
use base64::Engine;
use itertools::Itertools;
@ -120,7 +118,7 @@ pub async fn main() {
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let exit_notify = Arc::new(Notify::new());
let (_, exit_notify) = tokio::sync::broadcast::channel(1);
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
@ -134,19 +132,19 @@ pub async fn main() {
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify,
);
start_example_blockmeta_consumer(blockmeta_rx);

View File

@ -1,11 +1,10 @@
use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
@ -36,7 +35,7 @@ enum FatalErrorReason {
pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
exit_notify: Arc<Notify>,
exit_notify: broadcast::Receiver<()>,
) -> (JoinHandle<()>, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
@ -57,7 +56,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
exit_notify: Arc<Notify>,
mut exit_notify: broadcast::Receiver<()>,
) -> JoinHandle<()> {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
@ -98,7 +97,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
@ -157,7 +156,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
@ -220,7 +219,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
//slept
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
@ -254,7 +253,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
//slept
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
@ -270,7 +269,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
@ -293,7 +292,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};
@ -320,7 +319,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
res = mpsc_downstream.send(the_message)=> {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break 'main_loop;
}
};