Changing Notify channel to broadcast channel
This commit is contained in:
parent
ce6ca26028
commit
688e4d241d
|
@ -2,8 +2,7 @@ use log::info;
|
|||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
|
||||
|
@ -89,7 +88,7 @@ pub async fn main() {
|
|||
|
||||
info!("Write Block stream..");
|
||||
|
||||
let exit_notify = Arc::new(Notify::new());
|
||||
let (_, exit_notify) = broadcast::channel(1);
|
||||
|
||||
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
|
||||
green_config.clone(),
|
||||
|
|
|
@ -2,8 +2,6 @@ use log::{info, warn};
|
|||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use base64::Engine;
|
||||
use itertools::Itertools;
|
||||
|
@ -120,7 +118,7 @@ pub async fn main() {
|
|||
subscribe_timeout: Duration::from_secs(5),
|
||||
receive_timeout: Duration::from_secs(5),
|
||||
};
|
||||
let exit_notify = Arc::new(Notify::new());
|
||||
let (_, exit_notify) = tokio::sync::broadcast::channel(1);
|
||||
|
||||
let green_config =
|
||||
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
|
||||
|
@ -134,19 +132,19 @@ pub async fn main() {
|
|||
green_config.clone(),
|
||||
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
);
|
||||
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
|
||||
blue_config.clone(),
|
||||
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
);
|
||||
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
|
||||
toxiproxy_config.clone(),
|
||||
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.clone(),
|
||||
exit_notify,
|
||||
);
|
||||
start_example_blockmeta_consumer(blockmeta_rx);
|
||||
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
|
||||
use futures::{Stream, StreamExt};
|
||||
use log::{debug, error, info, log, trace, warn, Level};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{sleep, timeout, Instant};
|
||||
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
|
||||
|
@ -35,7 +34,7 @@ enum FatalErrorReason {
|
|||
pub fn create_geyser_autoconnection_task(
|
||||
grpc_source: GrpcSourceConfig,
|
||||
subscribe_filter: SubscribeRequest,
|
||||
exit_notify: Arc<Notify>,
|
||||
exit_notify: broadcast::Receiver<()>,
|
||||
) -> (JoinHandle<()>, Receiver<Message>) {
|
||||
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
|
||||
|
||||
|
@ -56,7 +55,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
grpc_source: GrpcSourceConfig,
|
||||
subscribe_filter: SubscribeRequest,
|
||||
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
|
||||
exit_notify: Arc<Notify>,
|
||||
mut exit_notify: broadcast::Receiver<()>,
|
||||
) -> JoinHandle<()> {
|
||||
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
|
||||
|
||||
|
@ -97,7 +96,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.notified() => {
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
};
|
||||
|
@ -156,7 +155,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.notified() => {
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
};
|
||||
|
@ -219,7 +218,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
|
||||
//slept
|
||||
},
|
||||
_ = exit_notify.notified() => {
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
};
|
||||
|
@ -253,7 +252,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
|
||||
//slept
|
||||
},
|
||||
_ = exit_notify.notified() => {
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
};
|
||||
|
@ -269,7 +268,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.notified() => {
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
};
|
||||
|
@ -292,7 +291,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.notified() => {
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
};
|
||||
|
@ -319,7 +318,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
|
|||
res = mpsc_downstream.send(the_message)=> {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.notified() => {
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue