minor cleanup

This commit is contained in:
GroovieGermanikus 2024-01-22 10:55:03 +01:00
parent 87a4bbc484
commit 6e15250b96
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 9 additions and 24 deletions

View File

@ -5,6 +5,9 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
use geyser_grpc_connector::channel_plugger::{
spawn_broadcast_channel_plug, spawn_plugger_mpcs_to_broadcast,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{
create_geyser_autoconnection_task, Message,
@ -18,7 +21,6 @@ use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
use geyser_grpc_connector::channel_plugger::spawn_plugger_mpcs_to_broadcast;
fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
@ -79,7 +81,7 @@ enum TestCases {
CloseAfterReceiving,
AbortTaskFromOutside,
}
const TEST_CASE: TestCases = TestCases::TemporaryLaggingReceiver;
const TEST_CASE: TestCases = TestCases::Basic;
#[tokio::main]
pub async fn main() {
@ -107,13 +109,12 @@ pub async fn main() {
info!("Write Block stream..");
let (broadcast_tx, broadcast_rx) = tokio::sync::broadcast::channel(100);
let (jh_geyser_task, mut message_channel) = create_geyser_autoconnection_task(
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
);
spawn_plugger_mpcs_to_broadcast(message_channel, broadcast_tx);
let mut message_channel = broadcast_rx;
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
tokio::spawn(async move {
if let TestCases::SlowReceiverStartup = TEST_CASE {
@ -131,8 +132,7 @@ pub async fn main() {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
message_count += 1;
// info!("got update: {:?}", subscriber_update.update_oneof.);
info!("got update!!!");
info!("got update - {} bytes", subscriber_update.encoded_len());
if let TestCases::CloseAfterReceiving = TEST_CASE {
info!("(testcase) closing stream after receiving");
@ -155,5 +155,5 @@ pub async fn main() {
});
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
sleep(Duration::from_secs(2000)).await;
}

View File

@ -295,21 +295,6 @@ pub fn create_geyser_autoconnection_task(
);
}
}
// {
// Ok(n_subscribers) => {
// trace!(
// "sent update message to {} subscribers (buffer={})",
// n_subscribers,
// sender.len()
// );
// continue 'recv_loop;
// }
// Err(SendError(_)) => {
// // note: error does not mean that future sends will also fail!
// trace!("no subscribers for update message");
// continue 'recv_loop;
// }
// };
}
Some(Err(tonic_status)) => {
// all tonic errors are recoverable